]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_main.c
drbd: switch configuration interface from connector to genetlink
[karo-tx-linux.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details;       /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113  * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119  * as member "struct gendisk *vdisk;"
120  */
121 struct idr minors;
122 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
123
124 struct kmem_cache *drbd_request_cache;
125 struct kmem_cache *drbd_ee_cache;       /* peer requests */
126 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
127 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
128 mempool_t *drbd_request_mempool;
129 mempool_t *drbd_ee_mempool;
130 mempool_t *drbd_md_io_page_pool;
131 struct bio_set *drbd_md_io_bio_set;
132
133 /* I do not use a standard mempool, because:
134    1) I want to hand out the pre-allocated objects first.
135    2) I want to be able to interrupt sleeping allocation with a signal.
136    Note: This is a single linked list, the next pointer is the private
137          member of struct page.
138  */
139 struct page *drbd_pp_pool;
140 spinlock_t   drbd_pp_lock;
141 int          drbd_pp_vacant;
142 wait_queue_head_t drbd_pp_wait;
143
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
145
146 static const struct block_device_operations drbd_ops = {
147         .owner =   THIS_MODULE,
148         .open =    drbd_open,
149         .release = drbd_release,
150 };
151
152 static void bio_destructor_drbd(struct bio *bio)
153 {
154         bio_free(bio, drbd_md_io_bio_set);
155 }
156
157 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
158 {
159         struct bio *bio;
160
161         if (!drbd_md_io_bio_set)
162                 return bio_alloc(gfp_mask, 1);
163
164         bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
165         if (!bio)
166                 return NULL;
167         bio->bi_destructor = bio_destructor_drbd;
168         return bio;
169 }
170
171 #ifdef __CHECKER__
172 /* When checking with sparse, and this is an inline function, sparse will
173    give tons of false positives. When this is a real functions sparse works.
174  */
175 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
176 {
177         int io_allowed;
178
179         atomic_inc(&mdev->local_cnt);
180         io_allowed = (mdev->state.disk >= mins);
181         if (!io_allowed) {
182                 if (atomic_dec_and_test(&mdev->local_cnt))
183                         wake_up(&mdev->misc_wait);
184         }
185         return io_allowed;
186 }
187
188 #endif
189
190 /**
191  * DOC: The transfer log
192  *
193  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
194  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
195  * of the list. There is always at least one &struct drbd_tl_epoch object.
196  *
197  * Each &struct drbd_tl_epoch has a circular double linked list of requests
198  * attached.
199  */
200 static int tl_init(struct drbd_tconn *tconn)
201 {
202         struct drbd_tl_epoch *b;
203
204         /* during device minor initialization, we may well use GFP_KERNEL */
205         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
206         if (!b)
207                 return 0;
208         INIT_LIST_HEAD(&b->requests);
209         INIT_LIST_HEAD(&b->w.list);
210         b->next = NULL;
211         b->br_number = 4711;
212         b->n_writes = 0;
213         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
214
215         tconn->oldest_tle = b;
216         tconn->newest_tle = b;
217         INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
218
219         return 1;
220 }
221
222 static void tl_cleanup(struct drbd_tconn *tconn)
223 {
224         if (tconn->oldest_tle != tconn->newest_tle)
225                 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
226         if (!list_empty(&tconn->out_of_sequence_requests))
227                 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
228         kfree(tconn->oldest_tle);
229         tconn->oldest_tle = NULL;
230         kfree(tconn->unused_spare_tle);
231         tconn->unused_spare_tle = NULL;
232 }
233
234 /**
235  * _tl_add_barrier() - Adds a barrier to the transfer log
236  * @mdev:       DRBD device.
237  * @new:        Barrier to be added before the current head of the TL.
238  *
239  * The caller must hold the req_lock.
240  */
241 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
242 {
243         struct drbd_tl_epoch *newest_before;
244
245         INIT_LIST_HEAD(&new->requests);
246         INIT_LIST_HEAD(&new->w.list);
247         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
248         new->next = NULL;
249         new->n_writes = 0;
250
251         newest_before = tconn->newest_tle;
252         /* never send a barrier number == 0, because that is special-cased
253          * when using TCQ for our write ordering code */
254         new->br_number = (newest_before->br_number+1) ?: 1;
255         if (tconn->newest_tle != new) {
256                 tconn->newest_tle->next = new;
257                 tconn->newest_tle = new;
258         }
259 }
260
261 /**
262  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
263  * @mdev:       DRBD device.
264  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
265  * @set_size:   Expected number of requests before that barrier.
266  *
267  * In case the passed barrier_nr or set_size does not match the oldest
268  * &struct drbd_tl_epoch objects this function will cause a termination
269  * of the connection.
270  */
271 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
272                 unsigned int set_size)
273 {
274         struct drbd_conf *mdev;
275         struct drbd_tl_epoch *b, *nob; /* next old barrier */
276         struct list_head *le, *tle;
277         struct drbd_request *r;
278
279         spin_lock_irq(&tconn->req_lock);
280
281         b = tconn->oldest_tle;
282
283         /* first some paranoia code */
284         if (b == NULL) {
285                 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
286                          barrier_nr);
287                 goto bail;
288         }
289         if (b->br_number != barrier_nr) {
290                 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
291                          barrier_nr, b->br_number);
292                 goto bail;
293         }
294         if (b->n_writes != set_size) {
295                 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
296                          barrier_nr, set_size, b->n_writes);
297                 goto bail;
298         }
299
300         /* Clean up list of requests processed during current epoch */
301         list_for_each_safe(le, tle, &b->requests) {
302                 r = list_entry(le, struct drbd_request, tl_requests);
303                 _req_mod(r, BARRIER_ACKED);
304         }
305         /* There could be requests on the list waiting for completion
306            of the write to the local disk. To avoid corruptions of
307            slab's data structures we have to remove the lists head.
308
309            Also there could have been a barrier ack out of sequence, overtaking
310            the write acks - which would be a bug and violating write ordering.
311            To not deadlock in case we lose connection while such requests are
312            still pending, we need some way to find them for the
313            _req_mode(CONNECTION_LOST_WHILE_PENDING).
314
315            These have been list_move'd to the out_of_sequence_requests list in
316            _req_mod(, BARRIER_ACKED) above.
317            */
318         list_del_init(&b->requests);
319         mdev = b->w.mdev;
320
321         nob = b->next;
322         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
323                 _tl_add_barrier(tconn, b);
324                 if (nob)
325                         tconn->oldest_tle = nob;
326                 /* if nob == NULL b was the only barrier, and becomes the new
327                    barrier. Therefore tconn->oldest_tle points already to b */
328         } else {
329                 D_ASSERT(nob != NULL);
330                 tconn->oldest_tle = nob;
331                 kfree(b);
332         }
333
334         spin_unlock_irq(&tconn->req_lock);
335         dec_ap_pending(mdev);
336
337         return;
338
339 bail:
340         spin_unlock_irq(&tconn->req_lock);
341         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
342 }
343
344
345 /**
346  * _tl_restart() - Walks the transfer log, and applies an action to all requests
347  * @mdev:       DRBD device.
348  * @what:       The action/event to perform with all request objects
349  *
350  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
351  * RESTART_FROZEN_DISK_IO.
352  */
353 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
354 {
355         struct drbd_tl_epoch *b, *tmp, **pn;
356         struct list_head *le, *tle, carry_reads;
357         struct drbd_request *req;
358         int rv, n_writes, n_reads;
359
360         b = tconn->oldest_tle;
361         pn = &tconn->oldest_tle;
362         while (b) {
363                 n_writes = 0;
364                 n_reads = 0;
365                 INIT_LIST_HEAD(&carry_reads);
366                 list_for_each_safe(le, tle, &b->requests) {
367                         req = list_entry(le, struct drbd_request, tl_requests);
368                         rv = _req_mod(req, what);
369
370                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
371                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
372                 }
373                 tmp = b->next;
374
375                 if (n_writes) {
376                         if (what == RESEND) {
377                                 b->n_writes = n_writes;
378                                 if (b->w.cb == NULL) {
379                                         b->w.cb = w_send_barrier;
380                                         inc_ap_pending(b->w.mdev);
381                                         set_bit(CREATE_BARRIER, &b->w.mdev->flags);
382                                 }
383
384                                 drbd_queue_work(&tconn->data.work, &b->w);
385                         }
386                         pn = &b->next;
387                 } else {
388                         if (n_reads)
389                                 list_add(&carry_reads, &b->requests);
390                         /* there could still be requests on that ring list,
391                          * in case local io is still pending */
392                         list_del(&b->requests);
393
394                         /* dec_ap_pending corresponding to queue_barrier.
395                          * the newest barrier may not have been queued yet,
396                          * in which case w.cb is still NULL. */
397                         if (b->w.cb != NULL)
398                                 dec_ap_pending(b->w.mdev);
399
400                         if (b == tconn->newest_tle) {
401                                 /* recycle, but reinit! */
402                                 if (tmp != NULL)
403                                         conn_err(tconn, "ASSERT FAILED tmp == NULL");
404                                 INIT_LIST_HEAD(&b->requests);
405                                 list_splice(&carry_reads, &b->requests);
406                                 INIT_LIST_HEAD(&b->w.list);
407                                 b->w.cb = NULL;
408                                 b->br_number = net_random();
409                                 b->n_writes = 0;
410
411                                 *pn = b;
412                                 break;
413                         }
414                         *pn = tmp;
415                         kfree(b);
416                 }
417                 b = tmp;
418                 list_splice(&carry_reads, &b->requests);
419         }
420 }
421
422
423 /**
424  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
425  * @mdev:       DRBD device.
426  *
427  * This is called after the connection to the peer was lost. The storage covered
428  * by the requests on the transfer gets marked as our of sync. Called from the
429  * receiver thread and the worker thread.
430  */
431 void tl_clear(struct drbd_tconn *tconn)
432 {
433         struct drbd_conf *mdev;
434         struct list_head *le, *tle;
435         struct drbd_request *r;
436         int minor;
437
438         spin_lock_irq(&tconn->req_lock);
439
440         _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
441
442         /* we expect this list to be empty. */
443         if (!list_empty(&tconn->out_of_sequence_requests))
444                 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
445
446         /* but just in case, clean it up anyways! */
447         list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
448                 r = list_entry(le, struct drbd_request, tl_requests);
449                 /* It would be nice to complete outside of spinlock.
450                  * But this is easier for now. */
451                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
452         }
453
454         /* ensure bit indicating barrier is required is clear */
455         idr_for_each_entry(&tconn->volumes, mdev, minor)
456                 clear_bit(CREATE_BARRIER, &mdev->flags);
457
458         spin_unlock_irq(&tconn->req_lock);
459 }
460
461 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
462 {
463         spin_lock_irq(&tconn->req_lock);
464         _tl_restart(tconn, what);
465         spin_unlock_irq(&tconn->req_lock);
466 }
467
468 static int drbd_thread_setup(void *arg)
469 {
470         struct drbd_thread *thi = (struct drbd_thread *) arg;
471         struct drbd_tconn *tconn = thi->tconn;
472         unsigned long flags;
473         int retval;
474
475         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
476                  thi->name[0], thi->tconn->name);
477
478 restart:
479         retval = thi->function(thi);
480
481         spin_lock_irqsave(&thi->t_lock, flags);
482
483         /* if the receiver has been "EXITING", the last thing it did
484          * was set the conn state to "StandAlone",
485          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
486          * and receiver thread will be "started".
487          * drbd_thread_start needs to set "RESTARTING" in that case.
488          * t_state check and assignment needs to be within the same spinlock,
489          * so either thread_start sees EXITING, and can remap to RESTARTING,
490          * or thread_start see NONE, and can proceed as normal.
491          */
492
493         if (thi->t_state == RESTARTING) {
494                 conn_info(tconn, "Restarting %s thread\n", thi->name);
495                 thi->t_state = RUNNING;
496                 spin_unlock_irqrestore(&thi->t_lock, flags);
497                 goto restart;
498         }
499
500         thi->task = NULL;
501         thi->t_state = NONE;
502         smp_mb();
503         complete(&thi->stop);
504         spin_unlock_irqrestore(&thi->t_lock, flags);
505
506         conn_info(tconn, "Terminating %s\n", current->comm);
507
508         /* Release mod reference taken when thread was started */
509         module_put(THIS_MODULE);
510         return retval;
511 }
512
513 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
514                              int (*func) (struct drbd_thread *), char *name)
515 {
516         spin_lock_init(&thi->t_lock);
517         thi->task    = NULL;
518         thi->t_state = NONE;
519         thi->function = func;
520         thi->tconn = tconn;
521         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
522 }
523
524 int drbd_thread_start(struct drbd_thread *thi)
525 {
526         struct drbd_tconn *tconn = thi->tconn;
527         struct task_struct *nt;
528         unsigned long flags;
529
530         /* is used from state engine doing drbd_thread_stop_nowait,
531          * while holding the req lock irqsave */
532         spin_lock_irqsave(&thi->t_lock, flags);
533
534         switch (thi->t_state) {
535         case NONE:
536                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
537                          thi->name, current->comm, current->pid);
538
539                 /* Get ref on module for thread - this is released when thread exits */
540                 if (!try_module_get(THIS_MODULE)) {
541                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
542                         spin_unlock_irqrestore(&thi->t_lock, flags);
543                         return false;
544                 }
545
546                 init_completion(&thi->stop);
547                 thi->reset_cpu_mask = 1;
548                 thi->t_state = RUNNING;
549                 spin_unlock_irqrestore(&thi->t_lock, flags);
550                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
551
552                 nt = kthread_create(drbd_thread_setup, (void *) thi,
553                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
554
555                 if (IS_ERR(nt)) {
556                         conn_err(tconn, "Couldn't start thread\n");
557
558                         module_put(THIS_MODULE);
559                         return false;
560                 }
561                 spin_lock_irqsave(&thi->t_lock, flags);
562                 thi->task = nt;
563                 thi->t_state = RUNNING;
564                 spin_unlock_irqrestore(&thi->t_lock, flags);
565                 wake_up_process(nt);
566                 break;
567         case EXITING:
568                 thi->t_state = RESTARTING;
569                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
570                                 thi->name, current->comm, current->pid);
571                 /* fall through */
572         case RUNNING:
573         case RESTARTING:
574         default:
575                 spin_unlock_irqrestore(&thi->t_lock, flags);
576                 break;
577         }
578
579         return true;
580 }
581
582
583 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
584 {
585         unsigned long flags;
586
587         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
588
589         /* may be called from state engine, holding the req lock irqsave */
590         spin_lock_irqsave(&thi->t_lock, flags);
591
592         if (thi->t_state == NONE) {
593                 spin_unlock_irqrestore(&thi->t_lock, flags);
594                 if (restart)
595                         drbd_thread_start(thi);
596                 return;
597         }
598
599         if (thi->t_state != ns) {
600                 if (thi->task == NULL) {
601                         spin_unlock_irqrestore(&thi->t_lock, flags);
602                         return;
603                 }
604
605                 thi->t_state = ns;
606                 smp_mb();
607                 init_completion(&thi->stop);
608                 if (thi->task != current)
609                         force_sig(DRBD_SIGKILL, thi->task);
610         }
611
612         spin_unlock_irqrestore(&thi->t_lock, flags);
613
614         if (wait)
615                 wait_for_completion(&thi->stop);
616 }
617
618 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
619 {
620         struct drbd_thread *thi =
621                 task == tconn->receiver.task ? &tconn->receiver :
622                 task == tconn->asender.task  ? &tconn->asender :
623                 task == tconn->worker.task   ? &tconn->worker : NULL;
624
625         return thi;
626 }
627
628 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
629 {
630         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
631         return thi ? thi->name : task->comm;
632 }
633
634 int conn_lowest_minor(struct drbd_tconn *tconn)
635 {
636         int minor = 0;
637
638         if (!idr_get_next(&tconn->volumes, &minor))
639                 return -1;
640         return minor;
641 }
642
643 #ifdef CONFIG_SMP
644 /**
645  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
646  * @mdev:       DRBD device.
647  *
648  * Forces all threads of a device onto the same CPU. This is beneficial for
649  * DRBD's performance. May be overwritten by user's configuration.
650  */
651 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
652 {
653         int ord, cpu;
654
655         /* user override. */
656         if (cpumask_weight(tconn->cpu_mask))
657                 return;
658
659         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
660         for_each_online_cpu(cpu) {
661                 if (ord-- == 0) {
662                         cpumask_set_cpu(cpu, tconn->cpu_mask);
663                         return;
664                 }
665         }
666         /* should not be reached */
667         cpumask_setall(tconn->cpu_mask);
668 }
669
670 /**
671  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
672  * @mdev:       DRBD device.
673  * @thi:        drbd_thread object
674  *
675  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
676  * prematurely.
677  */
678 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
679 {
680         struct task_struct *p = current;
681
682         if (!thi->reset_cpu_mask)
683                 return;
684         thi->reset_cpu_mask = 0;
685         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
686 }
687 #endif
688
689 static void prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
690 {
691         h->magic   = cpu_to_be32(DRBD_MAGIC);
692         h->command = cpu_to_be16(cmd);
693         h->length  = cpu_to_be16(size);
694 }
695
696 static void prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
697 {
698         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
699         h->command = cpu_to_be16(cmd);
700         h->length  = cpu_to_be32(size);
701 }
702
703 static void _prepare_header(struct drbd_tconn *tconn, int vnr, struct p_header *h,
704                             enum drbd_packet cmd, int size)
705 {
706         if (tconn->agreed_pro_version >= 100 || size > DRBD_MAX_SIZE_H80_PACKET)
707                 prepare_header95(&h->h95, cmd, size);
708         else
709                 prepare_header80(&h->h80, cmd, size);
710 }
711
712 static void prepare_header(struct drbd_conf *mdev, struct p_header *h,
713                            enum drbd_packet cmd, int size)
714 {
715         _prepare_header(mdev->tconn, mdev->vnr, h, cmd, size);
716 }
717
718 /* the appropriate socket mutex must be held already */
719 int _conn_send_cmd(struct drbd_tconn *tconn, int vnr, struct socket *sock,
720                    enum drbd_packet cmd, struct p_header *h, size_t size,
721                    unsigned msg_flags)
722 {
723         int sent, ok;
724
725         _prepare_header(tconn, vnr, h, cmd, size - sizeof(struct p_header));
726
727         sent = drbd_send(tconn, sock, h, size, msg_flags);
728
729         ok = (sent == size);
730         if (!ok && !signal_pending(current))
731                 conn_warn(tconn, "short sent %s size=%d sent=%d\n",
732                           cmdname(cmd), (int)size, sent);
733         return ok;
734 }
735
736 /* don't pass the socket. we may only look at it
737  * when we hold the appropriate socket mutex.
738  */
739 int conn_send_cmd(struct drbd_tconn *tconn, int vnr, int use_data_socket,
740                   enum drbd_packet cmd, struct p_header *h, size_t size)
741 {
742         int ok = 0;
743         struct socket *sock;
744
745         if (use_data_socket) {
746                 mutex_lock(&tconn->data.mutex);
747                 sock = tconn->data.socket;
748         } else {
749                 mutex_lock(&tconn->meta.mutex);
750                 sock = tconn->meta.socket;
751         }
752
753         /* drbd_disconnect() could have called drbd_free_sock()
754          * while we were waiting in down()... */
755         if (likely(sock != NULL))
756                 ok = _conn_send_cmd(tconn, vnr, sock, cmd, h, size, 0);
757
758         if (use_data_socket)
759                 mutex_unlock(&tconn->data.mutex);
760         else
761                 mutex_unlock(&tconn->meta.mutex);
762         return ok;
763 }
764
765 int conn_send_cmd2(struct drbd_tconn *tconn, enum drbd_packet cmd, char *data,
766                    size_t size)
767 {
768         struct p_header80 h;
769         int ok;
770
771         prepare_header80(&h, cmd, size);
772
773         if (!drbd_get_data_sock(tconn))
774                 return 0;
775
776         ok = (sizeof(h) ==
777                 drbd_send(tconn, tconn->data.socket, &h, sizeof(h), 0));
778         ok = ok && (size ==
779                 drbd_send(tconn, tconn->data.socket, data, size, 0));
780
781         drbd_put_data_sock(tconn);
782
783         return ok;
784 }
785
786 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
787 {
788         struct p_rs_param_95 *p;
789         struct socket *sock;
790         int size, rv;
791         const int apv = mdev->tconn->agreed_pro_version;
792
793         size = apv <= 87 ? sizeof(struct p_rs_param)
794                 : apv == 88 ? sizeof(struct p_rs_param)
795                         + strlen(mdev->sync_conf.verify_alg) + 1
796                 : apv <= 94 ? sizeof(struct p_rs_param_89)
797                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
798
799         /* used from admin command context and receiver/worker context.
800          * to avoid kmalloc, grab the socket right here,
801          * then use the pre-allocated sbuf there */
802         mutex_lock(&mdev->tconn->data.mutex);
803         sock = mdev->tconn->data.socket;
804
805         if (likely(sock != NULL)) {
806                 enum drbd_packet cmd =
807                         apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
808
809                 p = &mdev->tconn->data.sbuf.rs_param_95;
810
811                 /* initialize verify_alg and csums_alg */
812                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
813
814                 p->rate = cpu_to_be32(sc->rate);
815                 p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
816                 p->c_delay_target = cpu_to_be32(sc->c_delay_target);
817                 p->c_fill_target = cpu_to_be32(sc->c_fill_target);
818                 p->c_max_rate = cpu_to_be32(sc->c_max_rate);
819
820                 if (apv >= 88)
821                         strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
822                 if (apv >= 89)
823                         strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
824
825                 rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
826         } else
827                 rv = 0; /* not ok */
828
829         mutex_unlock(&mdev->tconn->data.mutex);
830
831         return rv;
832 }
833
834 int drbd_send_protocol(struct drbd_tconn *tconn)
835 {
836         struct p_protocol *p;
837         int size, cf, rv;
838
839         size = sizeof(struct p_protocol);
840
841         if (tconn->agreed_pro_version >= 87)
842                 size += strlen(tconn->net_conf->integrity_alg) + 1;
843
844         /* we must not recurse into our own queue,
845          * as that is blocked during handshake */
846         p = kmalloc(size, GFP_NOIO);
847         if (p == NULL)
848                 return 0;
849
850         p->protocol      = cpu_to_be32(tconn->net_conf->wire_protocol);
851         p->after_sb_0p   = cpu_to_be32(tconn->net_conf->after_sb_0p);
852         p->after_sb_1p   = cpu_to_be32(tconn->net_conf->after_sb_1p);
853         p->after_sb_2p   = cpu_to_be32(tconn->net_conf->after_sb_2p);
854         p->two_primaries = cpu_to_be32(tconn->net_conf->two_primaries);
855
856         cf = 0;
857         if (tconn->net_conf->want_lose)
858                 cf |= CF_WANT_LOSE;
859         if (tconn->net_conf->dry_run) {
860                 if (tconn->agreed_pro_version >= 92)
861                         cf |= CF_DRY_RUN;
862                 else {
863                         conn_err(tconn, "--dry-run is not supported by peer");
864                         kfree(p);
865                         return -1;
866                 }
867         }
868         p->conn_flags    = cpu_to_be32(cf);
869
870         if (tconn->agreed_pro_version >= 87)
871                 strcpy(p->integrity_alg, tconn->net_conf->integrity_alg);
872
873         rv = conn_send_cmd2(tconn, P_PROTOCOL, p->head.payload, size - sizeof(struct p_header));
874         kfree(p);
875         return rv;
876 }
877
878 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
879 {
880         struct p_uuids p;
881         int i;
882
883         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
884                 return 1;
885
886         for (i = UI_CURRENT; i < UI_SIZE; i++)
887                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
888
889         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
890         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
891         uuid_flags |= mdev->tconn->net_conf->want_lose ? 1 : 0;
892         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
893         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
894         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
895
896         put_ldev(mdev);
897
898         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS, &p.head, sizeof(p));
899 }
900
901 int drbd_send_uuids(struct drbd_conf *mdev)
902 {
903         return _drbd_send_uuids(mdev, 0);
904 }
905
906 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
907 {
908         return _drbd_send_uuids(mdev, 8);
909 }
910
911 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
912 {
913         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
914                 u64 *uuid = mdev->ldev->md.uuid;
915                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
916                      text,
917                      (unsigned long long)uuid[UI_CURRENT],
918                      (unsigned long long)uuid[UI_BITMAP],
919                      (unsigned long long)uuid[UI_HISTORY_START],
920                      (unsigned long long)uuid[UI_HISTORY_END]);
921                 put_ldev(mdev);
922         } else {
923                 dev_info(DEV, "%s effective data uuid: %016llX\n",
924                                 text,
925                                 (unsigned long long)mdev->ed_uuid);
926         }
927 }
928
929 int drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
930 {
931         struct p_rs_uuid p;
932         u64 uuid;
933
934         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
935
936         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
937         drbd_uuid_set(mdev, UI_BITMAP, uuid);
938         drbd_print_uuids(mdev, "updated sync UUID");
939         drbd_md_sync(mdev);
940         p.uuid = cpu_to_be64(uuid);
941
942         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID, &p.head, sizeof(p));
943 }
944
945 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
946 {
947         struct p_sizes p;
948         sector_t d_size, u_size;
949         int q_order_type, max_bio_size;
950         int ok;
951
952         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
953                 D_ASSERT(mdev->ldev->backing_bdev);
954                 d_size = drbd_get_max_capacity(mdev->ldev);
955                 u_size = mdev->ldev->dc.disk_size;
956                 q_order_type = drbd_queue_order_type(mdev);
957                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
958                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
959                 put_ldev(mdev);
960         } else {
961                 d_size = 0;
962                 u_size = 0;
963                 q_order_type = QUEUE_ORDERED_NONE;
964                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
965         }
966
967         p.d_size = cpu_to_be64(d_size);
968         p.u_size = cpu_to_be64(u_size);
969         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
970         p.max_bio_size = cpu_to_be32(max_bio_size);
971         p.queue_order_type = cpu_to_be16(q_order_type);
972         p.dds_flags = cpu_to_be16(flags);
973
974         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES, &p.head, sizeof(p));
975         return ok;
976 }
977
978 /**
979  * drbd_send_state() - Sends the drbd state to the peer
980  * @mdev:       DRBD device.
981  */
982 int drbd_send_state(struct drbd_conf *mdev)
983 {
984         struct socket *sock;
985         struct p_state p;
986         int ok = 0;
987
988         mutex_lock(&mdev->tconn->data.mutex);
989
990         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
991         sock = mdev->tconn->data.socket;
992
993         if (likely(sock != NULL)) {
994                 ok = _drbd_send_cmd(mdev, sock, P_STATE, &p.head, sizeof(p), 0);
995         }
996
997         mutex_unlock(&mdev->tconn->data.mutex);
998
999         return ok;
1000 }
1001
1002 int _conn_send_state_req(struct drbd_tconn *tconn, int vnr, enum drbd_packet cmd,
1003                          union drbd_state mask, union drbd_state val)
1004 {
1005         struct p_req_state p;
1006
1007         p.mask    = cpu_to_be32(mask.i);
1008         p.val     = cpu_to_be32(val.i);
1009
1010         return conn_send_cmd(tconn, vnr, USE_DATA_SOCKET, cmd, &p.head, sizeof(p));
1011 }
1012
1013 int drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1014 {
1015         struct p_req_state_reply p;
1016
1017         p.retcode    = cpu_to_be32(retcode);
1018
1019         return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY, &p.head, sizeof(p));
1020 }
1021
1022 int conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1023 {
1024         struct p_req_state_reply p;
1025         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1026
1027         p.retcode    = cpu_to_be32(retcode);
1028
1029         return conn_send_cmd(tconn, 0, USE_META_SOCKET, cmd, &p.head, sizeof(p));
1030 }
1031
1032 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1033         struct p_compressed_bm *p,
1034         struct bm_xfer_ctx *c)
1035 {
1036         struct bitstream bs;
1037         unsigned long plain_bits;
1038         unsigned long tmp;
1039         unsigned long rl;
1040         unsigned len;
1041         unsigned toggle;
1042         int bits;
1043
1044         /* may we use this feature? */
1045         if ((mdev->sync_conf.use_rle == 0) ||
1046                 (mdev->tconn->agreed_pro_version < 90))
1047                         return 0;
1048
1049         if (c->bit_offset >= c->bm_bits)
1050                 return 0; /* nothing to do. */
1051
1052         /* use at most thus many bytes */
1053         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1054         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1055         /* plain bits covered in this code string */
1056         plain_bits = 0;
1057
1058         /* p->encoding & 0x80 stores whether the first run length is set.
1059          * bit offset is implicit.
1060          * start with toggle == 2 to be able to tell the first iteration */
1061         toggle = 2;
1062
1063         /* see how much plain bits we can stuff into one packet
1064          * using RLE and VLI. */
1065         do {
1066                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1067                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1068                 if (tmp == -1UL)
1069                         tmp = c->bm_bits;
1070                 rl = tmp - c->bit_offset;
1071
1072                 if (toggle == 2) { /* first iteration */
1073                         if (rl == 0) {
1074                                 /* the first checked bit was set,
1075                                  * store start value, */
1076                                 DCBP_set_start(p, 1);
1077                                 /* but skip encoding of zero run length */
1078                                 toggle = !toggle;
1079                                 continue;
1080                         }
1081                         DCBP_set_start(p, 0);
1082                 }
1083
1084                 /* paranoia: catch zero runlength.
1085                  * can only happen if bitmap is modified while we scan it. */
1086                 if (rl == 0) {
1087                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1088                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1089                         return -1;
1090                 }
1091
1092                 bits = vli_encode_bits(&bs, rl);
1093                 if (bits == -ENOBUFS) /* buffer full */
1094                         break;
1095                 if (bits <= 0) {
1096                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1097                         return 0;
1098                 }
1099
1100                 toggle = !toggle;
1101                 plain_bits += rl;
1102                 c->bit_offset = tmp;
1103         } while (c->bit_offset < c->bm_bits);
1104
1105         len = bs.cur.b - p->code + !!bs.cur.bit;
1106
1107         if (plain_bits < (len << 3)) {
1108                 /* incompressible with this method.
1109                  * we need to rewind both word and bit position. */
1110                 c->bit_offset -= plain_bits;
1111                 bm_xfer_ctx_bit_to_word_offset(c);
1112                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1113                 return 0;
1114         }
1115
1116         /* RLE + VLI was able to compress it just fine.
1117          * update c->word_offset. */
1118         bm_xfer_ctx_bit_to_word_offset(c);
1119
1120         /* store pad_bits */
1121         DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1122
1123         return len;
1124 }
1125
1126 /**
1127  * send_bitmap_rle_or_plain
1128  *
1129  * Return 0 when done, 1 when another iteration is needed, and a negative error
1130  * code upon failure.
1131  */
1132 static int
1133 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1134                          struct p_header *h, struct bm_xfer_ctx *c)
1135 {
1136         struct p_compressed_bm *p = (void*)h;
1137         unsigned long num_words;
1138         int len;
1139         int ok;
1140
1141         len = fill_bitmap_rle_bits(mdev, p, c);
1142
1143         if (len < 0)
1144                 return -EIO;
1145
1146         if (len) {
1147                 DCBP_set_code(p, RLE_VLI_Bits);
1148                 ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_COMPRESSED_BITMAP, h,
1149                         sizeof(*p) + len, 0);
1150
1151                 c->packets[0]++;
1152                 c->bytes[0] += sizeof(*p) + len;
1153
1154                 if (c->bit_offset >= c->bm_bits)
1155                         len = 0; /* DONE */
1156         } else {
1157                 /* was not compressible.
1158                  * send a buffer full of plain text bits instead. */
1159                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1160                 len = num_words * sizeof(long);
1161                 if (len)
1162                         drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1163                 ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BITMAP,
1164                                    h, sizeof(struct p_header80) + len, 0);
1165                 c->word_offset += num_words;
1166                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1167
1168                 c->packets[1]++;
1169                 c->bytes[1] += sizeof(struct p_header80) + len;
1170
1171                 if (c->bit_offset > c->bm_bits)
1172                         c->bit_offset = c->bm_bits;
1173         }
1174         if (ok) {
1175                 if (len == 0) {
1176                         INFO_bm_xfer_stats(mdev, "send", c);
1177                         return 0;
1178                 } else
1179                         return 1;
1180         }
1181         return -EIO;
1182 }
1183
1184 /* See the comment at receive_bitmap() */
1185 int _drbd_send_bitmap(struct drbd_conf *mdev)
1186 {
1187         struct bm_xfer_ctx c;
1188         struct p_header *p;
1189         int err;
1190
1191         if (!expect(mdev->bitmap))
1192                 return false;
1193
1194         /* maybe we should use some per thread scratch page,
1195          * and allocate that during initial device creation? */
1196         p = (struct p_header *) __get_free_page(GFP_NOIO);
1197         if (!p) {
1198                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
1199                 return false;
1200         }
1201
1202         if (get_ldev(mdev)) {
1203                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1204                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1205                         drbd_bm_set_all(mdev);
1206                         if (drbd_bm_write(mdev)) {
1207                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1208                                  * but otherwise process as per normal - need to tell other
1209                                  * side that a full resync is required! */
1210                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1211                         } else {
1212                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1213                                 drbd_md_sync(mdev);
1214                         }
1215                 }
1216                 put_ldev(mdev);
1217         }
1218
1219         c = (struct bm_xfer_ctx) {
1220                 .bm_bits = drbd_bm_bits(mdev),
1221                 .bm_words = drbd_bm_words(mdev),
1222         };
1223
1224         do {
1225                 err = send_bitmap_rle_or_plain(mdev, p, &c);
1226         } while (err > 0);
1227
1228         free_page((unsigned long) p);
1229         return err == 0;
1230 }
1231
1232 int drbd_send_bitmap(struct drbd_conf *mdev)
1233 {
1234         int err;
1235
1236         if (!drbd_get_data_sock(mdev->tconn))
1237                 return -1;
1238         err = !_drbd_send_bitmap(mdev);
1239         drbd_put_data_sock(mdev->tconn);
1240         return err;
1241 }
1242
1243 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1244 {
1245         int ok;
1246         struct p_barrier_ack p;
1247
1248         p.barrier  = barrier_nr;
1249         p.set_size = cpu_to_be32(set_size);
1250
1251         if (mdev->state.conn < C_CONNECTED)
1252                 return false;
1253         ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK, &p.head, sizeof(p));
1254         return ok;
1255 }
1256
1257 /**
1258  * _drbd_send_ack() - Sends an ack packet
1259  * @mdev:       DRBD device.
1260  * @cmd:        Packet command code.
1261  * @sector:     sector, needs to be in big endian byte order
1262  * @blksize:    size in byte, needs to be in big endian byte order
1263  * @block_id:   Id, big endian byte order
1264  */
1265 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1266                           u64 sector, u32 blksize, u64 block_id)
1267 {
1268         int ok;
1269         struct p_block_ack p;
1270
1271         p.sector   = sector;
1272         p.block_id = block_id;
1273         p.blksize  = blksize;
1274         p.seq_num  = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1275
1276         if (!mdev->tconn->meta.socket || mdev->state.conn < C_CONNECTED)
1277                 return false;
1278         ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd, &p.head, sizeof(p));
1279         return ok;
1280 }
1281
1282 /* dp->sector and dp->block_id already/still in network byte order,
1283  * data_size is payload size according to dp->head,
1284  * and may need to be corrected for digest size. */
1285 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1286                      struct p_data *dp, int data_size)
1287 {
1288         data_size -= (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1289                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1290         return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1291                               dp->block_id);
1292 }
1293
1294 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1295                      struct p_block_req *rp)
1296 {
1297         return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1298 }
1299
1300 /**
1301  * drbd_send_ack() - Sends an ack packet
1302  * @mdev:       DRBD device
1303  * @cmd:        packet command code
1304  * @peer_req:   peer request
1305  */
1306 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1307                   struct drbd_peer_request *peer_req)
1308 {
1309         return _drbd_send_ack(mdev, cmd,
1310                               cpu_to_be64(peer_req->i.sector),
1311                               cpu_to_be32(peer_req->i.size),
1312                               peer_req->block_id);
1313 }
1314
1315 /* This function misuses the block_id field to signal if the blocks
1316  * are is sync or not. */
1317 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1318                      sector_t sector, int blksize, u64 block_id)
1319 {
1320         return _drbd_send_ack(mdev, cmd,
1321                               cpu_to_be64(sector),
1322                               cpu_to_be32(blksize),
1323                               cpu_to_be64(block_id));
1324 }
1325
1326 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1327                        sector_t sector, int size, u64 block_id)
1328 {
1329         int ok;
1330         struct p_block_req p;
1331
1332         p.sector   = cpu_to_be64(sector);
1333         p.block_id = block_id;
1334         p.blksize  = cpu_to_be32(size);
1335
1336         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd, &p.head, sizeof(p));
1337         return ok;
1338 }
1339
1340 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1341                             void *digest, int digest_size, enum drbd_packet cmd)
1342 {
1343         int ok;
1344         struct p_block_req p;
1345
1346         prepare_header(mdev, &p.head, cmd, sizeof(p) - sizeof(struct p_header) + digest_size);
1347         p.sector   = cpu_to_be64(sector);
1348         p.block_id = ID_SYNCER /* unused */;
1349         p.blksize  = cpu_to_be32(size);
1350
1351         mutex_lock(&mdev->tconn->data.mutex);
1352
1353         ok = (sizeof(p) == drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), 0));
1354         ok = ok && (digest_size == drbd_send(mdev->tconn, mdev->tconn->data.socket, digest, digest_size, 0));
1355
1356         mutex_unlock(&mdev->tconn->data.mutex);
1357
1358         return ok;
1359 }
1360
1361 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1362 {
1363         int ok;
1364         struct p_block_req p;
1365
1366         p.sector   = cpu_to_be64(sector);
1367         p.block_id = ID_SYNCER /* unused */;
1368         p.blksize  = cpu_to_be32(size);
1369
1370         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST, &p.head, sizeof(p));
1371         return ok;
1372 }
1373
1374 /* called on sndtimeo
1375  * returns false if we should retry,
1376  * true if we think connection is dead
1377  */
1378 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1379 {
1380         int drop_it;
1381         /* long elapsed = (long)(jiffies - mdev->last_received); */
1382
1383         drop_it =   tconn->meta.socket == sock
1384                 || !tconn->asender.task
1385                 || get_t_state(&tconn->asender) != RUNNING
1386                 || tconn->cstate < C_WF_REPORT_PARAMS;
1387
1388         if (drop_it)
1389                 return true;
1390
1391         drop_it = !--tconn->ko_count;
1392         if (!drop_it) {
1393                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1394                          current->comm, current->pid, tconn->ko_count);
1395                 request_ping(tconn);
1396         }
1397
1398         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1399 }
1400
1401 static void drbd_update_congested(struct drbd_tconn *tconn)
1402 {
1403         struct sock *sk = tconn->data.socket->sk;
1404         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1405                 set_bit(NET_CONGESTED, &tconn->flags);
1406 }
1407
1408 /* The idea of sendpage seems to be to put some kind of reference
1409  * to the page into the skb, and to hand it over to the NIC. In
1410  * this process get_page() gets called.
1411  *
1412  * As soon as the page was really sent over the network put_page()
1413  * gets called by some part of the network layer. [ NIC driver? ]
1414  *
1415  * [ get_page() / put_page() increment/decrement the count. If count
1416  *   reaches 0 the page will be freed. ]
1417  *
1418  * This works nicely with pages from FSs.
1419  * But this means that in protocol A we might signal IO completion too early!
1420  *
1421  * In order not to corrupt data during a resync we must make sure
1422  * that we do not reuse our own buffer pages (EEs) to early, therefore
1423  * we have the net_ee list.
1424  *
1425  * XFS seems to have problems, still, it submits pages with page_count == 0!
1426  * As a workaround, we disable sendpage on pages
1427  * with page_count == 0 or PageSlab.
1428  */
1429 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1430                    int offset, size_t size, unsigned msg_flags)
1431 {
1432         int sent = drbd_send(mdev->tconn, mdev->tconn->data.socket, kmap(page) + offset, size, msg_flags);
1433         kunmap(page);
1434         if (sent == size)
1435                 mdev->send_cnt += size>>9;
1436         return sent == size;
1437 }
1438
1439 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1440                     int offset, size_t size, unsigned msg_flags)
1441 {
1442         mm_segment_t oldfs = get_fs();
1443         int sent, ok;
1444         int len = size;
1445
1446         /* e.g. XFS meta- & log-data is in slab pages, which have a
1447          * page_count of 0 and/or have PageSlab() set.
1448          * we cannot use send_page for those, as that does get_page();
1449          * put_page(); and would cause either a VM_BUG directly, or
1450          * __page_cache_release a page that would actually still be referenced
1451          * by someone, leading to some obscure delayed Oops somewhere else. */
1452         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1453                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1454
1455         msg_flags |= MSG_NOSIGNAL;
1456         drbd_update_congested(mdev->tconn);
1457         set_fs(KERNEL_DS);
1458         do {
1459                 sent = mdev->tconn->data.socket->ops->sendpage(mdev->tconn->data.socket, page,
1460                                                         offset, len,
1461                                                         msg_flags);
1462                 if (sent == -EAGAIN) {
1463                         if (we_should_drop_the_connection(mdev->tconn,
1464                                                           mdev->tconn->data.socket))
1465                                 break;
1466                         else
1467                                 continue;
1468                 }
1469                 if (sent <= 0) {
1470                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1471                              __func__, (int)size, len, sent);
1472                         break;
1473                 }
1474                 len    -= sent;
1475                 offset += sent;
1476         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1477         set_fs(oldfs);
1478         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1479
1480         ok = (len == 0);
1481         if (likely(ok))
1482                 mdev->send_cnt += size>>9;
1483         return ok;
1484 }
1485
1486 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1487 {
1488         struct bio_vec *bvec;
1489         int i;
1490         /* hint all but last page with MSG_MORE */
1491         __bio_for_each_segment(bvec, bio, i, 0) {
1492                 if (!_drbd_no_send_page(mdev, bvec->bv_page,
1493                                      bvec->bv_offset, bvec->bv_len,
1494                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
1495                         return 0;
1496         }
1497         return 1;
1498 }
1499
1500 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1501 {
1502         struct bio_vec *bvec;
1503         int i;
1504         /* hint all but last page with MSG_MORE */
1505         __bio_for_each_segment(bvec, bio, i, 0) {
1506                 if (!_drbd_send_page(mdev, bvec->bv_page,
1507                                      bvec->bv_offset, bvec->bv_len,
1508                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
1509                         return 0;
1510         }
1511         return 1;
1512 }
1513
1514 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1515                             struct drbd_peer_request *peer_req)
1516 {
1517         struct page *page = peer_req->pages;
1518         unsigned len = peer_req->i.size;
1519
1520         /* hint all but last page with MSG_MORE */
1521         page_chain_for_each(page) {
1522                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1523                 if (!_drbd_send_page(mdev, page, 0, l,
1524                                 page_chain_next(page) ? MSG_MORE : 0))
1525                         return 0;
1526                 len -= l;
1527         }
1528         return 1;
1529 }
1530
1531 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1532 {
1533         if (mdev->tconn->agreed_pro_version >= 95)
1534                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1535                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1536                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1537                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1538         else
1539                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1540 }
1541
1542 /* Used to send write requests
1543  * R_PRIMARY -> Peer    (P_DATA)
1544  */
1545 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1546 {
1547         int ok = 1;
1548         struct p_data p;
1549         unsigned int dp_flags = 0;
1550         void *dgb;
1551         int dgs;
1552
1553         if (!drbd_get_data_sock(mdev->tconn))
1554                 return 0;
1555
1556         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1557                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1558
1559         prepare_header(mdev, &p.head, P_DATA, sizeof(p) - sizeof(struct p_header) + dgs + req->i.size);
1560         p.sector   = cpu_to_be64(req->i.sector);
1561         p.block_id = (unsigned long)req;
1562         p.seq_num  = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1563
1564         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1565
1566         if (mdev->state.conn >= C_SYNC_SOURCE &&
1567             mdev->state.conn <= C_PAUSED_SYNC_T)
1568                 dp_flags |= DP_MAY_SET_IN_SYNC;
1569
1570         p.dp_flags = cpu_to_be32(dp_flags);
1571         set_bit(UNPLUG_REMOTE, &mdev->flags);
1572         ok = (sizeof(p) ==
1573                 drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
1574         if (ok && dgs) {
1575                 dgb = mdev->tconn->int_dig_out;
1576                 drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, dgb);
1577                 ok = dgs == drbd_send(mdev->tconn, mdev->tconn->data.socket, dgb, dgs, 0);
1578         }
1579         if (ok) {
1580                 /* For protocol A, we have to memcpy the payload into
1581                  * socket buffers, as we may complete right away
1582                  * as soon as we handed it over to tcp, at which point the data
1583                  * pages may become invalid.
1584                  *
1585                  * For data-integrity enabled, we copy it as well, so we can be
1586                  * sure that even if the bio pages may still be modified, it
1587                  * won't change the data on the wire, thus if the digest checks
1588                  * out ok after sending on this side, but does not fit on the
1589                  * receiving side, we sure have detected corruption elsewhere.
1590                  */
1591                 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A || dgs)
1592                         ok = _drbd_send_bio(mdev, req->master_bio);
1593                 else
1594                         ok = _drbd_send_zc_bio(mdev, req->master_bio);
1595
1596                 /* double check digest, sometimes buffers have been modified in flight. */
1597                 if (dgs > 0 && dgs <= 64) {
1598                         /* 64 byte, 512 bit, is the largest digest size
1599                          * currently supported in kernel crypto. */
1600                         unsigned char digest[64];
1601                         drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, digest);
1602                         if (memcmp(mdev->tconn->int_dig_out, digest, dgs)) {
1603                                 dev_warn(DEV,
1604                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1605                                         (unsigned long long)req->i.sector, req->i.size);
1606                         }
1607                 } /* else if (dgs > 64) {
1608                      ... Be noisy about digest too large ...
1609                 } */
1610         }
1611
1612         drbd_put_data_sock(mdev->tconn);
1613
1614         return ok;
1615 }
1616
1617 /* answer packet, used to send data back for read requests:
1618  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1619  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1620  */
1621 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1622                     struct drbd_peer_request *peer_req)
1623 {
1624         int ok;
1625         struct p_data p;
1626         void *dgb;
1627         int dgs;
1628
1629         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1630                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1631
1632         prepare_header(mdev, &p.head, cmd, sizeof(p) -
1633                                            sizeof(struct p_header80) +
1634                                            dgs + peer_req->i.size);
1635         p.sector   = cpu_to_be64(peer_req->i.sector);
1636         p.block_id = peer_req->block_id;
1637         p.seq_num = 0;  /* unused */
1638
1639         /* Only called by our kernel thread.
1640          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
1641          * in response to admin command or module unload.
1642          */
1643         if (!drbd_get_data_sock(mdev->tconn))
1644                 return 0;
1645
1646         ok = sizeof(p) == drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
1647         if (ok && dgs) {
1648                 dgb = mdev->tconn->int_dig_out;
1649                 drbd_csum_ee(mdev, mdev->tconn->integrity_w_tfm, peer_req, dgb);
1650                 ok = dgs == drbd_send(mdev->tconn, mdev->tconn->data.socket, dgb, dgs, 0);
1651         }
1652         if (ok)
1653                 ok = _drbd_send_zc_ee(mdev, peer_req);
1654
1655         drbd_put_data_sock(mdev->tconn);
1656
1657         return ok;
1658 }
1659
1660 int drbd_send_oos(struct drbd_conf *mdev, struct drbd_request *req)
1661 {
1662         struct p_block_desc p;
1663
1664         p.sector  = cpu_to_be64(req->i.sector);
1665         p.blksize = cpu_to_be32(req->i.size);
1666
1667         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OUT_OF_SYNC, &p.head, sizeof(p));
1668 }
1669
1670 /*
1671   drbd_send distinguishes two cases:
1672
1673   Packets sent via the data socket "sock"
1674   and packets sent via the meta data socket "msock"
1675
1676                     sock                      msock
1677   -----------------+-------------------------+------------------------------
1678   timeout           conf.timeout / 2          conf.timeout / 2
1679   timeout action    send a ping via msock     Abort communication
1680                                               and close all sockets
1681 */
1682
1683 /*
1684  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1685  */
1686 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1687               void *buf, size_t size, unsigned msg_flags)
1688 {
1689         struct kvec iov;
1690         struct msghdr msg;
1691         int rv, sent = 0;
1692
1693         if (!sock)
1694                 return -1000;
1695
1696         /* THINK  if (signal_pending) return ... ? */
1697
1698         iov.iov_base = buf;
1699         iov.iov_len  = size;
1700
1701         msg.msg_name       = NULL;
1702         msg.msg_namelen    = 0;
1703         msg.msg_control    = NULL;
1704         msg.msg_controllen = 0;
1705         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1706
1707         if (sock == tconn->data.socket) {
1708                 tconn->ko_count = tconn->net_conf->ko_count;
1709                 drbd_update_congested(tconn);
1710         }
1711         do {
1712                 /* STRANGE
1713                  * tcp_sendmsg does _not_ use its size parameter at all ?
1714                  *
1715                  * -EAGAIN on timeout, -EINTR on signal.
1716                  */
1717 /* THINK
1718  * do we need to block DRBD_SIG if sock == &meta.socket ??
1719  * otherwise wake_asender() might interrupt some send_*Ack !
1720  */
1721                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1722                 if (rv == -EAGAIN) {
1723                         if (we_should_drop_the_connection(tconn, sock))
1724                                 break;
1725                         else
1726                                 continue;
1727                 }
1728                 if (rv == -EINTR) {
1729                         flush_signals(current);
1730                         rv = 0;
1731                 }
1732                 if (rv < 0)
1733                         break;
1734                 sent += rv;
1735                 iov.iov_base += rv;
1736                 iov.iov_len  -= rv;
1737         } while (sent < size);
1738
1739         if (sock == tconn->data.socket)
1740                 clear_bit(NET_CONGESTED, &tconn->flags);
1741
1742         if (rv <= 0) {
1743                 if (rv != -EAGAIN) {
1744                         conn_err(tconn, "%s_sendmsg returned %d\n",
1745                                  sock == tconn->meta.socket ? "msock" : "sock",
1746                                  rv);
1747                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1748                 } else
1749                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1750         }
1751
1752         return sent;
1753 }
1754
1755 static int drbd_open(struct block_device *bdev, fmode_t mode)
1756 {
1757         struct drbd_conf *mdev = bdev->bd_disk->private_data;
1758         unsigned long flags;
1759         int rv = 0;
1760
1761         mutex_lock(&drbd_main_mutex);
1762         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1763         /* to have a stable mdev->state.role
1764          * and no race with updating open_cnt */
1765
1766         if (mdev->state.role != R_PRIMARY) {
1767                 if (mode & FMODE_WRITE)
1768                         rv = -EROFS;
1769                 else if (!allow_oos)
1770                         rv = -EMEDIUMTYPE;
1771         }
1772
1773         if (!rv)
1774                 mdev->open_cnt++;
1775         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1776         mutex_unlock(&drbd_main_mutex);
1777
1778         return rv;
1779 }
1780
1781 static int drbd_release(struct gendisk *gd, fmode_t mode)
1782 {
1783         struct drbd_conf *mdev = gd->private_data;
1784         mutex_lock(&drbd_main_mutex);
1785         mdev->open_cnt--;
1786         mutex_unlock(&drbd_main_mutex);
1787         return 0;
1788 }
1789
1790 static void drbd_set_defaults(struct drbd_conf *mdev)
1791 {
1792         /* This way we get a compile error when sync_conf grows,
1793            and we forgot to initialize it here */
1794         mdev->sync_conf = (struct syncer_conf) {
1795                 /* .rate = */           DRBD_RATE_DEF,
1796                 /* .after = */          DRBD_AFTER_DEF,
1797                 /* .al_extents = */     DRBD_AL_EXTENTS_DEF,
1798                 /* .verify_alg = */     {}, 0,
1799                 /* .cpu_mask = */       {}, 0,
1800                 /* .csums_alg = */      {}, 0,
1801                 /* .use_rle = */        0,
1802                 /* .on_no_data = */     DRBD_ON_NO_DATA_DEF,
1803                 /* .c_plan_ahead = */   DRBD_C_PLAN_AHEAD_DEF,
1804                 /* .c_delay_target = */ DRBD_C_DELAY_TARGET_DEF,
1805                 /* .c_fill_target = */  DRBD_C_FILL_TARGET_DEF,
1806                 /* .c_max_rate = */     DRBD_C_MAX_RATE_DEF,
1807                 /* .c_min_rate = */     DRBD_C_MIN_RATE_DEF
1808         };
1809
1810         /* Have to use that way, because the layout differs between
1811            big endian and little endian */
1812         mdev->state = (union drbd_state) {
1813                 { .role = R_SECONDARY,
1814                   .peer = R_UNKNOWN,
1815                   .conn = C_STANDALONE,
1816                   .disk = D_DISKLESS,
1817                   .pdsk = D_UNKNOWN,
1818                   .susp = 0,
1819                   .susp_nod = 0,
1820                   .susp_fen = 0
1821                 } };
1822 }
1823
1824 void drbd_init_set_defaults(struct drbd_conf *mdev)
1825 {
1826         /* the memset(,0,) did most of this.
1827          * note: only assignments, no allocation in here */
1828
1829         drbd_set_defaults(mdev);
1830
1831         atomic_set(&mdev->ap_bio_cnt, 0);
1832         atomic_set(&mdev->ap_pending_cnt, 0);
1833         atomic_set(&mdev->rs_pending_cnt, 0);
1834         atomic_set(&mdev->unacked_cnt, 0);
1835         atomic_set(&mdev->local_cnt, 0);
1836         atomic_set(&mdev->pp_in_use, 0);
1837         atomic_set(&mdev->pp_in_use_by_net, 0);
1838         atomic_set(&mdev->rs_sect_in, 0);
1839         atomic_set(&mdev->rs_sect_ev, 0);
1840         atomic_set(&mdev->ap_in_flight, 0);
1841
1842         mutex_init(&mdev->md_io_mutex);
1843         mutex_init(&mdev->own_state_mutex);
1844         mdev->state_mutex = &mdev->own_state_mutex;
1845
1846         spin_lock_init(&mdev->al_lock);
1847         spin_lock_init(&mdev->peer_seq_lock);
1848         spin_lock_init(&mdev->epoch_lock);
1849
1850         INIT_LIST_HEAD(&mdev->active_ee);
1851         INIT_LIST_HEAD(&mdev->sync_ee);
1852         INIT_LIST_HEAD(&mdev->done_ee);
1853         INIT_LIST_HEAD(&mdev->read_ee);
1854         INIT_LIST_HEAD(&mdev->net_ee);
1855         INIT_LIST_HEAD(&mdev->resync_reads);
1856         INIT_LIST_HEAD(&mdev->resync_work.list);
1857         INIT_LIST_HEAD(&mdev->unplug_work.list);
1858         INIT_LIST_HEAD(&mdev->go_diskless.list);
1859         INIT_LIST_HEAD(&mdev->md_sync_work.list);
1860         INIT_LIST_HEAD(&mdev->start_resync_work.list);
1861         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
1862
1863         mdev->resync_work.cb  = w_resync_timer;
1864         mdev->unplug_work.cb  = w_send_write_hint;
1865         mdev->go_diskless.cb  = w_go_diskless;
1866         mdev->md_sync_work.cb = w_md_sync;
1867         mdev->bm_io_work.w.cb = w_bitmap_io;
1868         mdev->start_resync_work.cb = w_start_resync;
1869
1870         mdev->resync_work.mdev  = mdev;
1871         mdev->unplug_work.mdev  = mdev;
1872         mdev->go_diskless.mdev  = mdev;
1873         mdev->md_sync_work.mdev = mdev;
1874         mdev->bm_io_work.w.mdev = mdev;
1875         mdev->start_resync_work.mdev = mdev;
1876
1877         init_timer(&mdev->resync_timer);
1878         init_timer(&mdev->md_sync_timer);
1879         init_timer(&mdev->start_resync_timer);
1880         init_timer(&mdev->request_timer);
1881         mdev->resync_timer.function = resync_timer_fn;
1882         mdev->resync_timer.data = (unsigned long) mdev;
1883         mdev->md_sync_timer.function = md_sync_timer_fn;
1884         mdev->md_sync_timer.data = (unsigned long) mdev;
1885         mdev->start_resync_timer.function = start_resync_timer_fn;
1886         mdev->start_resync_timer.data = (unsigned long) mdev;
1887         mdev->request_timer.function = request_timer_fn;
1888         mdev->request_timer.data = (unsigned long) mdev;
1889
1890         init_waitqueue_head(&mdev->misc_wait);
1891         init_waitqueue_head(&mdev->state_wait);
1892         init_waitqueue_head(&mdev->ee_wait);
1893         init_waitqueue_head(&mdev->al_wait);
1894         init_waitqueue_head(&mdev->seq_wait);
1895
1896         /* mdev->tconn->agreed_pro_version gets initialized in drbd_connect() */
1897         mdev->write_ordering = WO_bdev_flush;
1898         mdev->resync_wenr = LC_FREE;
1899         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1900         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1901 }
1902
1903 void drbd_mdev_cleanup(struct drbd_conf *mdev)
1904 {
1905         int i;
1906         if (mdev->tconn->receiver.t_state != NONE)
1907                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
1908                                 mdev->tconn->receiver.t_state);
1909
1910         /* no need to lock it, I'm the only thread alive */
1911         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
1912                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
1913         mdev->al_writ_cnt  =
1914         mdev->bm_writ_cnt  =
1915         mdev->read_cnt     =
1916         mdev->recv_cnt     =
1917         mdev->send_cnt     =
1918         mdev->writ_cnt     =
1919         mdev->p_size       =
1920         mdev->rs_start     =
1921         mdev->rs_total     =
1922         mdev->rs_failed    = 0;
1923         mdev->rs_last_events = 0;
1924         mdev->rs_last_sect_ev = 0;
1925         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1926                 mdev->rs_mark_left[i] = 0;
1927                 mdev->rs_mark_time[i] = 0;
1928         }
1929         D_ASSERT(mdev->tconn->net_conf == NULL);
1930
1931         drbd_set_my_capacity(mdev, 0);
1932         if (mdev->bitmap) {
1933                 /* maybe never allocated. */
1934                 drbd_bm_resize(mdev, 0, 1);
1935                 drbd_bm_cleanup(mdev);
1936         }
1937
1938         drbd_free_resources(mdev);
1939         clear_bit(AL_SUSPENDED, &mdev->flags);
1940
1941         /*
1942          * currently we drbd_init_ee only on module load, so
1943          * we may do drbd_release_ee only on module unload!
1944          */
1945         D_ASSERT(list_empty(&mdev->active_ee));
1946         D_ASSERT(list_empty(&mdev->sync_ee));
1947         D_ASSERT(list_empty(&mdev->done_ee));
1948         D_ASSERT(list_empty(&mdev->read_ee));
1949         D_ASSERT(list_empty(&mdev->net_ee));
1950         D_ASSERT(list_empty(&mdev->resync_reads));
1951         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
1952         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
1953         D_ASSERT(list_empty(&mdev->resync_work.list));
1954         D_ASSERT(list_empty(&mdev->unplug_work.list));
1955         D_ASSERT(list_empty(&mdev->go_diskless.list));
1956
1957         drbd_set_defaults(mdev);
1958 }
1959
1960
1961 static void drbd_destroy_mempools(void)
1962 {
1963         struct page *page;
1964
1965         while (drbd_pp_pool) {
1966                 page = drbd_pp_pool;
1967                 drbd_pp_pool = (struct page *)page_private(page);
1968                 __free_page(page);
1969                 drbd_pp_vacant--;
1970         }
1971
1972         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
1973
1974         if (drbd_md_io_bio_set)
1975                 bioset_free(drbd_md_io_bio_set);
1976         if (drbd_md_io_page_pool)
1977                 mempool_destroy(drbd_md_io_page_pool);
1978         if (drbd_ee_mempool)
1979                 mempool_destroy(drbd_ee_mempool);
1980         if (drbd_request_mempool)
1981                 mempool_destroy(drbd_request_mempool);
1982         if (drbd_ee_cache)
1983                 kmem_cache_destroy(drbd_ee_cache);
1984         if (drbd_request_cache)
1985                 kmem_cache_destroy(drbd_request_cache);
1986         if (drbd_bm_ext_cache)
1987                 kmem_cache_destroy(drbd_bm_ext_cache);
1988         if (drbd_al_ext_cache)
1989                 kmem_cache_destroy(drbd_al_ext_cache);
1990
1991         drbd_md_io_bio_set   = NULL;
1992         drbd_md_io_page_pool = NULL;
1993         drbd_ee_mempool      = NULL;
1994         drbd_request_mempool = NULL;
1995         drbd_ee_cache        = NULL;
1996         drbd_request_cache   = NULL;
1997         drbd_bm_ext_cache    = NULL;
1998         drbd_al_ext_cache    = NULL;
1999
2000         return;
2001 }
2002
2003 static int drbd_create_mempools(void)
2004 {
2005         struct page *page;
2006         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2007         int i;
2008
2009         /* prepare our caches and mempools */
2010         drbd_request_mempool = NULL;
2011         drbd_ee_cache        = NULL;
2012         drbd_request_cache   = NULL;
2013         drbd_bm_ext_cache    = NULL;
2014         drbd_al_ext_cache    = NULL;
2015         drbd_pp_pool         = NULL;
2016         drbd_md_io_page_pool = NULL;
2017         drbd_md_io_bio_set   = NULL;
2018
2019         /* caches */
2020         drbd_request_cache = kmem_cache_create(
2021                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2022         if (drbd_request_cache == NULL)
2023                 goto Enomem;
2024
2025         drbd_ee_cache = kmem_cache_create(
2026                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2027         if (drbd_ee_cache == NULL)
2028                 goto Enomem;
2029
2030         drbd_bm_ext_cache = kmem_cache_create(
2031                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2032         if (drbd_bm_ext_cache == NULL)
2033                 goto Enomem;
2034
2035         drbd_al_ext_cache = kmem_cache_create(
2036                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2037         if (drbd_al_ext_cache == NULL)
2038                 goto Enomem;
2039
2040         /* mempools */
2041         drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2042         if (drbd_md_io_bio_set == NULL)
2043                 goto Enomem;
2044
2045         drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2046         if (drbd_md_io_page_pool == NULL)
2047                 goto Enomem;
2048
2049         drbd_request_mempool = mempool_create(number,
2050                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2051         if (drbd_request_mempool == NULL)
2052                 goto Enomem;
2053
2054         drbd_ee_mempool = mempool_create(number,
2055                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2056         if (drbd_ee_mempool == NULL)
2057                 goto Enomem;
2058
2059         /* drbd's page pool */
2060         spin_lock_init(&drbd_pp_lock);
2061
2062         for (i = 0; i < number; i++) {
2063                 page = alloc_page(GFP_HIGHUSER);
2064                 if (!page)
2065                         goto Enomem;
2066                 set_page_private(page, (unsigned long)drbd_pp_pool);
2067                 drbd_pp_pool = page;
2068         }
2069         drbd_pp_vacant = number;
2070
2071         return 0;
2072
2073 Enomem:
2074         drbd_destroy_mempools(); /* in case we allocated some */
2075         return -ENOMEM;
2076 }
2077
2078 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2079         void *unused)
2080 {
2081         /* just so we have it.  you never know what interesting things we
2082          * might want to do here some day...
2083          */
2084
2085         return NOTIFY_DONE;
2086 }
2087
2088 static struct notifier_block drbd_notifier = {
2089         .notifier_call = drbd_notify_sys,
2090 };
2091
2092 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2093 {
2094         int rr;
2095
2096         rr = drbd_release_ee(mdev, &mdev->active_ee);
2097         if (rr)
2098                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2099
2100         rr = drbd_release_ee(mdev, &mdev->sync_ee);
2101         if (rr)
2102                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2103
2104         rr = drbd_release_ee(mdev, &mdev->read_ee);
2105         if (rr)
2106                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2107
2108         rr = drbd_release_ee(mdev, &mdev->done_ee);
2109         if (rr)
2110                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2111
2112         rr = drbd_release_ee(mdev, &mdev->net_ee);
2113         if (rr)
2114                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2115 }
2116
2117 /* caution. no locking. */
2118 void drbd_delete_device(unsigned int minor)
2119 {
2120         struct drbd_conf *mdev = minor_to_mdev(minor);
2121
2122         if (!mdev)
2123                 return;
2124
2125         idr_remove(&mdev->tconn->volumes, mdev->vnr);
2126         idr_remove(&minors, minor);
2127         synchronize_rcu();
2128
2129         /* paranoia asserts */
2130         D_ASSERT(mdev->open_cnt == 0);
2131         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2132         /* end paranoia asserts */
2133
2134         del_gendisk(mdev->vdisk);
2135
2136         /* cleanup stuff that may have been allocated during
2137          * device (re-)configuration or state changes */
2138
2139         if (mdev->this_bdev)
2140                 bdput(mdev->this_bdev);
2141
2142         drbd_free_resources(mdev);
2143
2144         drbd_release_ee_lists(mdev);
2145
2146         lc_destroy(mdev->act_log);
2147         lc_destroy(mdev->resync);
2148
2149         kfree(mdev->p_uuid);
2150         /* mdev->p_uuid = NULL; */
2151
2152         /* cleanup the rest that has been
2153          * allocated from drbd_new_device
2154          * and actually free the mdev itself */
2155         drbd_free_mdev(mdev);
2156 }
2157
2158 static void drbd_cleanup(void)
2159 {
2160         unsigned int i;
2161         struct drbd_conf *mdev;
2162
2163         unregister_reboot_notifier(&drbd_notifier);
2164
2165         /* first remove proc,
2166          * drbdsetup uses it's presence to detect
2167          * whether DRBD is loaded.
2168          * If we would get stuck in proc removal,
2169          * but have netlink already deregistered,
2170          * some drbdsetup commands may wait forever
2171          * for an answer.
2172          */
2173         if (drbd_proc)
2174                 remove_proc_entry("drbd", NULL);
2175
2176         drbd_genl_unregister();
2177
2178         idr_for_each_entry(&minors, mdev, i)
2179                 drbd_delete_device(i);
2180         drbd_destroy_mempools();
2181         unregister_blkdev(DRBD_MAJOR, "drbd");
2182
2183         idr_destroy(&minors);
2184
2185         printk(KERN_INFO "drbd: module cleanup done.\n");
2186 }
2187
2188 /**
2189  * drbd_congested() - Callback for pdflush
2190  * @congested_data:     User data
2191  * @bdi_bits:           Bits pdflush is currently interested in
2192  *
2193  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2194  */
2195 static int drbd_congested(void *congested_data, int bdi_bits)
2196 {
2197         struct drbd_conf *mdev = congested_data;
2198         struct request_queue *q;
2199         char reason = '-';
2200         int r = 0;
2201
2202         if (!may_inc_ap_bio(mdev)) {
2203                 /* DRBD has frozen IO */
2204                 r = bdi_bits;
2205                 reason = 'd';
2206                 goto out;
2207         }
2208
2209         if (get_ldev(mdev)) {
2210                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2211                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2212                 put_ldev(mdev);
2213                 if (r)
2214                         reason = 'b';
2215         }
2216
2217         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2218                 r |= (1 << BDI_async_congested);
2219                 reason = reason == 'b' ? 'a' : 'n';
2220         }
2221
2222 out:
2223         mdev->congestion_reason = reason;
2224         return r;
2225 }
2226
2227 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2228 {
2229         sema_init(&wq->s, 0);
2230         spin_lock_init(&wq->q_lock);
2231         INIT_LIST_HEAD(&wq->q);
2232 }
2233
2234 struct drbd_tconn *conn_by_name(const char *name)
2235 {
2236         struct drbd_tconn *tconn;
2237
2238         if (!name || !name[0])
2239                 return NULL;
2240
2241         write_lock_irq(&global_state_lock);
2242         list_for_each_entry(tconn, &drbd_tconns, all_tconn) {
2243                 if (!strcmp(tconn->name, name))
2244                         goto found;
2245         }
2246         tconn = NULL;
2247 found:
2248         write_unlock_irq(&global_state_lock);
2249         return tconn;
2250 }
2251
2252 struct drbd_tconn *drbd_new_tconn(const char *name)
2253 {
2254         struct drbd_tconn *tconn;
2255
2256         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2257         if (!tconn)
2258                 return NULL;
2259
2260         tconn->name = kstrdup(name, GFP_KERNEL);
2261         if (!tconn->name)
2262                 goto fail;
2263
2264         if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2265                 goto fail;
2266
2267         if (!tl_init(tconn))
2268                 goto fail;
2269
2270         tconn->cstate = C_STANDALONE;
2271         mutex_init(&tconn->cstate_mutex);
2272         spin_lock_init(&tconn->req_lock);
2273         atomic_set(&tconn->net_cnt, 0);
2274         init_waitqueue_head(&tconn->net_cnt_wait);
2275         init_waitqueue_head(&tconn->ping_wait);
2276         idr_init(&tconn->volumes);
2277
2278         drbd_init_workqueue(&tconn->data.work);
2279         mutex_init(&tconn->data.mutex);
2280
2281         drbd_init_workqueue(&tconn->meta.work);
2282         mutex_init(&tconn->meta.mutex);
2283
2284         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2285         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2286         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2287
2288         write_lock_irq(&global_state_lock);
2289         list_add(&tconn->all_tconn, &drbd_tconns);
2290         write_unlock_irq(&global_state_lock);
2291
2292         return tconn;
2293
2294 fail:
2295         tl_cleanup(tconn);
2296         free_cpumask_var(tconn->cpu_mask);
2297         kfree(tconn->name);
2298         kfree(tconn);
2299
2300         return NULL;
2301 }
2302
2303 void drbd_free_tconn(struct drbd_tconn *tconn)
2304 {
2305         write_lock_irq(&global_state_lock);
2306         list_del(&tconn->all_tconn);
2307         write_unlock_irq(&global_state_lock);
2308         idr_destroy(&tconn->volumes);
2309
2310         free_cpumask_var(tconn->cpu_mask);
2311         kfree(tconn->name);
2312         kfree(tconn->int_dig_out);
2313         kfree(tconn->int_dig_in);
2314         kfree(tconn->int_dig_vv);
2315         kfree(tconn);
2316 }
2317
2318 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2319 {
2320         struct drbd_conf *mdev;
2321         struct gendisk *disk;
2322         struct request_queue *q;
2323         int vnr_got = vnr;
2324         int minor_got = minor;
2325
2326         mdev = minor_to_mdev(minor);
2327         if (mdev)
2328                 return ERR_MINOR_EXISTS;
2329
2330         /* GFP_KERNEL, we are outside of all write-out paths */
2331         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2332         if (!mdev)
2333                 return ERR_NOMEM;
2334
2335         mdev->tconn = tconn;
2336         mdev->minor = minor;
2337         mdev->vnr = vnr;
2338
2339         drbd_init_set_defaults(mdev);
2340
2341         q = blk_alloc_queue(GFP_KERNEL);
2342         if (!q)
2343                 goto out_no_q;
2344         mdev->rq_queue = q;
2345         q->queuedata   = mdev;
2346
2347         disk = alloc_disk(1);
2348         if (!disk)
2349                 goto out_no_disk;
2350         mdev->vdisk = disk;
2351
2352         set_disk_ro(disk, true);
2353
2354         disk->queue = q;
2355         disk->major = DRBD_MAJOR;
2356         disk->first_minor = minor;
2357         disk->fops = &drbd_ops;
2358         sprintf(disk->disk_name, "drbd%d", minor);
2359         disk->private_data = mdev;
2360
2361         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2362         /* we have no partitions. we contain only ourselves. */
2363         mdev->this_bdev->bd_contains = mdev->this_bdev;
2364
2365         q->backing_dev_info.congested_fn = drbd_congested;
2366         q->backing_dev_info.congested_data = mdev;
2367
2368         blk_queue_make_request(q, drbd_make_request);
2369         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2370            This triggers a max_bio_size message upon first attach or connect */
2371         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2372         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2373         blk_queue_merge_bvec(q, drbd_merge_bvec);
2374         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2375
2376         mdev->md_io_page = alloc_page(GFP_KERNEL);
2377         if (!mdev->md_io_page)
2378                 goto out_no_io_page;
2379
2380         if (drbd_bm_init(mdev))
2381                 goto out_no_bitmap;
2382         mdev->read_requests = RB_ROOT;
2383         mdev->write_requests = RB_ROOT;
2384
2385         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2386         if (!mdev->current_epoch)
2387                 goto out_no_epoch;
2388
2389         INIT_LIST_HEAD(&mdev->current_epoch->list);
2390         mdev->epochs = 1;
2391
2392         if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2393                 goto out_no_vol_idr;
2394         if (idr_get_new(&tconn->volumes, mdev, &vnr_got))
2395                 goto out_no_vol_idr;
2396         if (vnr_got != vnr) {
2397                 dev_err(DEV, "vnr_got (%d) != vnr (%d)\n", vnr_got, vnr);
2398                 goto out_idr_remove_vol;
2399         }
2400
2401         if (!idr_pre_get(&minors, GFP_KERNEL))
2402                 goto out_idr_remove_vol;
2403         if (idr_get_new(&minors, mdev, &minor_got))
2404                 goto out_idr_remove_vol;
2405         if (minor_got != minor) {
2406                 /* minor exists, or other idr strangeness? */
2407                 dev_err(DEV, "available minor (%d) != requested minor (%d)\n",
2408                                 minor_got, minor);
2409                 goto out_idr_remove_minor;
2410         }
2411         add_disk(disk);
2412
2413         return NO_ERROR;
2414
2415 out_idr_remove_minor:
2416         idr_remove(&minors, minor_got);
2417 out_idr_remove_vol:
2418         idr_remove(&tconn->volumes, vnr_got);
2419         synchronize_rcu();
2420 out_no_vol_idr:
2421         kfree(mdev->current_epoch);
2422 out_no_epoch:
2423         drbd_bm_cleanup(mdev);
2424 out_no_bitmap:
2425         __free_page(mdev->md_io_page);
2426 out_no_io_page:
2427         put_disk(disk);
2428 out_no_disk:
2429         blk_cleanup_queue(q);
2430 out_no_q:
2431         kfree(mdev);
2432         return ERR_NOMEM;
2433 }
2434
2435 /* counterpart of drbd_new_device.
2436  * last part of drbd_delete_device. */
2437 void drbd_free_mdev(struct drbd_conf *mdev)
2438 {
2439         kfree(mdev->current_epoch);
2440         if (mdev->bitmap) /* should no longer be there. */
2441                 drbd_bm_cleanup(mdev);
2442         __free_page(mdev->md_io_page);
2443         put_disk(mdev->vdisk);
2444         blk_cleanup_queue(mdev->rq_queue);
2445         kfree(mdev);
2446 }
2447
2448
2449 int __init drbd_init(void)
2450 {
2451         int err;
2452
2453         BUILD_BUG_ON(sizeof(struct p_header80) != sizeof(struct p_header95));
2454         BUILD_BUG_ON(sizeof(struct p_handshake) != 80);
2455
2456         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2457                 printk(KERN_ERR
2458                        "drbd: invalid minor_count (%d)\n", minor_count);
2459 #ifdef MODULE
2460                 return -EINVAL;
2461 #else
2462                 minor_count = 8;
2463 #endif
2464         }
2465
2466         err = register_blkdev(DRBD_MAJOR, "drbd");
2467         if (err) {
2468                 printk(KERN_ERR
2469                        "drbd: unable to register block device major %d\n",
2470                        DRBD_MAJOR);
2471                 return err;
2472         }
2473
2474         err = drbd_genl_register();
2475         if (err) {
2476                 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2477                 goto fail;
2478         }
2479
2480
2481         register_reboot_notifier(&drbd_notifier);
2482
2483         /*
2484          * allocate all necessary structs
2485          */
2486         err = -ENOMEM;
2487
2488         init_waitqueue_head(&drbd_pp_wait);
2489
2490         drbd_proc = NULL; /* play safe for drbd_cleanup */
2491         idr_init(&minors);
2492
2493         err = drbd_create_mempools();
2494         if (err)
2495                 goto fail;
2496
2497         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2498         if (!drbd_proc) {
2499                 printk(KERN_ERR "drbd: unable to register proc file\n");
2500                 goto fail;
2501         }
2502
2503         rwlock_init(&global_state_lock);
2504         INIT_LIST_HEAD(&drbd_tconns);
2505
2506         printk(KERN_INFO "drbd: initialized. "
2507                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2508                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2509         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2510         printk(KERN_INFO "drbd: registered as block device major %d\n",
2511                 DRBD_MAJOR);
2512
2513         return 0; /* Success! */
2514
2515 fail:
2516         drbd_cleanup();
2517         if (err == -ENOMEM)
2518                 /* currently always the case */
2519                 printk(KERN_ERR "drbd: ran out of memory\n");
2520         else
2521                 printk(KERN_ERR "drbd: initialization failure\n");
2522         return err;
2523 }
2524
2525 void drbd_free_bc(struct drbd_backing_dev *ldev)
2526 {
2527         if (ldev == NULL)
2528                 return;
2529
2530         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2531         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2532
2533         kfree(ldev);
2534 }
2535
2536 void drbd_free_sock(struct drbd_tconn *tconn)
2537 {
2538         if (tconn->data.socket) {
2539                 mutex_lock(&tconn->data.mutex);
2540                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2541                 sock_release(tconn->data.socket);
2542                 tconn->data.socket = NULL;
2543                 mutex_unlock(&tconn->data.mutex);
2544         }
2545         if (tconn->meta.socket) {
2546                 mutex_lock(&tconn->meta.mutex);
2547                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2548                 sock_release(tconn->meta.socket);
2549                 tconn->meta.socket = NULL;
2550                 mutex_unlock(&tconn->meta.mutex);
2551         }
2552 }
2553
2554
2555 void drbd_free_resources(struct drbd_conf *mdev)
2556 {
2557         crypto_free_hash(mdev->csums_tfm);
2558         mdev->csums_tfm = NULL;
2559         crypto_free_hash(mdev->verify_tfm);
2560         mdev->verify_tfm = NULL;
2561         crypto_free_hash(mdev->tconn->cram_hmac_tfm);
2562         mdev->tconn->cram_hmac_tfm = NULL;
2563         crypto_free_hash(mdev->tconn->integrity_w_tfm);
2564         mdev->tconn->integrity_w_tfm = NULL;
2565         crypto_free_hash(mdev->tconn->integrity_r_tfm);
2566         mdev->tconn->integrity_r_tfm = NULL;
2567
2568         drbd_free_sock(mdev->tconn);
2569
2570         __no_warn(local,
2571                   drbd_free_bc(mdev->ldev);
2572                   mdev->ldev = NULL;);
2573 }
2574
2575 /* meta data management */
2576
2577 struct meta_data_on_disk {
2578         u64 la_size;           /* last agreed size. */
2579         u64 uuid[UI_SIZE];   /* UUIDs. */
2580         u64 device_uuid;
2581         u64 reserved_u64_1;
2582         u32 flags;             /* MDF */
2583         u32 magic;
2584         u32 md_size_sect;
2585         u32 al_offset;         /* offset to this block */
2586         u32 al_nr_extents;     /* important for restoring the AL */
2587               /* `-- act_log->nr_elements <-- sync_conf.al_extents */
2588         u32 bm_offset;         /* offset to the bitmap, from here */
2589         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2590         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2591         u32 reserved_u32[3];
2592
2593 } __packed;
2594
2595 /**
2596  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2597  * @mdev:       DRBD device.
2598  */
2599 void drbd_md_sync(struct drbd_conf *mdev)
2600 {
2601         struct meta_data_on_disk *buffer;
2602         sector_t sector;
2603         int i;
2604
2605         del_timer(&mdev->md_sync_timer);
2606         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2607         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2608                 return;
2609
2610         /* We use here D_FAILED and not D_ATTACHING because we try to write
2611          * metadata even if we detach due to a disk failure! */
2612         if (!get_ldev_if_state(mdev, D_FAILED))
2613                 return;
2614
2615         mutex_lock(&mdev->md_io_mutex);
2616         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2617         memset(buffer, 0, 512);
2618
2619         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2620         for (i = UI_CURRENT; i < UI_SIZE; i++)
2621                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2622         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2623         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2624
2625         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2626         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2627         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2628         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2629         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2630
2631         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2632         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2633
2634         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2635         sector = mdev->ldev->md.md_offset;
2636
2637         if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2638                 /* this was a try anyways ... */
2639                 dev_err(DEV, "meta data update failed!\n");
2640                 drbd_chk_io_error(mdev, 1, true);
2641         }
2642
2643         /* Update mdev->ldev->md.la_size_sect,
2644          * since we updated it on metadata. */
2645         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2646
2647         mutex_unlock(&mdev->md_io_mutex);
2648         put_ldev(mdev);
2649 }
2650
2651 /**
2652  * drbd_md_read() - Reads in the meta data super block
2653  * @mdev:       DRBD device.
2654  * @bdev:       Device from which the meta data should be read in.
2655  *
2656  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2657  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2658  */
2659 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2660 {
2661         struct meta_data_on_disk *buffer;
2662         int i, rv = NO_ERROR;
2663
2664         if (!get_ldev_if_state(mdev, D_ATTACHING))
2665                 return ERR_IO_MD_DISK;
2666
2667         mutex_lock(&mdev->md_io_mutex);
2668         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2669
2670         if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2671                 /* NOTE: can't do normal error processing here as this is
2672                    called BEFORE disk is attached */
2673                 dev_err(DEV, "Error while reading metadata.\n");
2674                 rv = ERR_IO_MD_DISK;
2675                 goto err;
2676         }
2677
2678         if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2679                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2680                 rv = ERR_MD_INVALID;
2681                 goto err;
2682         }
2683         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2684                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2685                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
2686                 rv = ERR_MD_INVALID;
2687                 goto err;
2688         }
2689         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
2690                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
2691                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
2692                 rv = ERR_MD_INVALID;
2693                 goto err;
2694         }
2695         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
2696                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
2697                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
2698                 rv = ERR_MD_INVALID;
2699                 goto err;
2700         }
2701
2702         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
2703                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
2704                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
2705                 rv = ERR_MD_INVALID;
2706                 goto err;
2707         }
2708
2709         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
2710         for (i = UI_CURRENT; i < UI_SIZE; i++)
2711                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
2712         bdev->md.flags = be32_to_cpu(buffer->flags);
2713         mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
2714         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
2715
2716         spin_lock_irq(&mdev->tconn->req_lock);
2717         if (mdev->state.conn < C_CONNECTED) {
2718                 int peer;
2719                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
2720                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
2721                 mdev->peer_max_bio_size = peer;
2722         }
2723         spin_unlock_irq(&mdev->tconn->req_lock);
2724
2725         if (mdev->sync_conf.al_extents < 7)
2726                 mdev->sync_conf.al_extents = 127;
2727
2728  err:
2729         mutex_unlock(&mdev->md_io_mutex);
2730         put_ldev(mdev);
2731
2732         return rv;
2733 }
2734
2735 /**
2736  * drbd_md_mark_dirty() - Mark meta data super block as dirty
2737  * @mdev:       DRBD device.
2738  *
2739  * Call this function if you change anything that should be written to
2740  * the meta-data super block. This function sets MD_DIRTY, and starts a
2741  * timer that ensures that within five seconds you have to call drbd_md_sync().
2742  */
2743 #ifdef DEBUG
2744 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
2745 {
2746         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
2747                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
2748                 mdev->last_md_mark_dirty.line = line;
2749                 mdev->last_md_mark_dirty.func = func;
2750         }
2751 }
2752 #else
2753 void drbd_md_mark_dirty(struct drbd_conf *mdev)
2754 {
2755         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
2756                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
2757 }
2758 #endif
2759
2760 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
2761 {
2762         int i;
2763
2764         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
2765                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
2766 }
2767
2768 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2769 {
2770         if (idx == UI_CURRENT) {
2771                 if (mdev->state.role == R_PRIMARY)
2772                         val |= 1;
2773                 else
2774                         val &= ~((u64)1);
2775
2776                 drbd_set_ed_uuid(mdev, val);
2777         }
2778
2779         mdev->ldev->md.uuid[idx] = val;
2780         drbd_md_mark_dirty(mdev);
2781 }
2782
2783
2784 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2785 {
2786         if (mdev->ldev->md.uuid[idx]) {
2787                 drbd_uuid_move_history(mdev);
2788                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
2789         }
2790         _drbd_uuid_set(mdev, idx, val);
2791 }
2792
2793 /**
2794  * drbd_uuid_new_current() - Creates a new current UUID
2795  * @mdev:       DRBD device.
2796  *
2797  * Creates a new current UUID, and rotates the old current UUID into
2798  * the bitmap slot. Causes an incremental resync upon next connect.
2799  */
2800 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
2801 {
2802         u64 val;
2803         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2804
2805         if (bm_uuid)
2806                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2807
2808         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
2809
2810         get_random_bytes(&val, sizeof(u64));
2811         _drbd_uuid_set(mdev, UI_CURRENT, val);
2812         drbd_print_uuids(mdev, "new current UUID");
2813         /* get it to stable storage _now_ */
2814         drbd_md_sync(mdev);
2815 }
2816
2817 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
2818 {
2819         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
2820                 return;
2821
2822         if (val == 0) {
2823                 drbd_uuid_move_history(mdev);
2824                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2825                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2826         } else {
2827                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2828                 if (bm_uuid)
2829                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2830
2831                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
2832         }
2833         drbd_md_mark_dirty(mdev);
2834 }
2835
2836 /**
2837  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2838  * @mdev:       DRBD device.
2839  *
2840  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
2841  */
2842 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
2843 {
2844         int rv = -EIO;
2845
2846         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2847                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
2848                 drbd_md_sync(mdev);
2849                 drbd_bm_set_all(mdev);
2850
2851                 rv = drbd_bm_write(mdev);
2852
2853                 if (!rv) {
2854                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2855                         drbd_md_sync(mdev);
2856                 }
2857
2858                 put_ldev(mdev);
2859         }
2860
2861         return rv;
2862 }
2863
2864 /**
2865  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2866  * @mdev:       DRBD device.
2867  *
2868  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
2869  */
2870 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
2871 {
2872         int rv = -EIO;
2873
2874         drbd_resume_al(mdev);
2875         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2876                 drbd_bm_clear_all(mdev);
2877                 rv = drbd_bm_write(mdev);
2878                 put_ldev(mdev);
2879         }
2880
2881         return rv;
2882 }
2883
2884 static int w_bitmap_io(struct drbd_work *w, int unused)
2885 {
2886         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
2887         struct drbd_conf *mdev = w->mdev;
2888         int rv = -EIO;
2889
2890         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
2891
2892         if (get_ldev(mdev)) {
2893                 drbd_bm_lock(mdev, work->why, work->flags);
2894                 rv = work->io_fn(mdev);
2895                 drbd_bm_unlock(mdev);
2896                 put_ldev(mdev);
2897         }
2898
2899         clear_bit_unlock(BITMAP_IO, &mdev->flags);
2900         wake_up(&mdev->misc_wait);
2901
2902         if (work->done)
2903                 work->done(mdev, rv);
2904
2905         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
2906         work->why = NULL;
2907         work->flags = 0;
2908
2909         return 1;
2910 }
2911
2912 void drbd_ldev_destroy(struct drbd_conf *mdev)
2913 {
2914         lc_destroy(mdev->resync);
2915         mdev->resync = NULL;
2916         lc_destroy(mdev->act_log);
2917         mdev->act_log = NULL;
2918         __no_warn(local,
2919                 drbd_free_bc(mdev->ldev);
2920                 mdev->ldev = NULL;);
2921
2922         clear_bit(GO_DISKLESS, &mdev->flags);
2923 }
2924
2925 static int w_go_diskless(struct drbd_work *w, int unused)
2926 {
2927         struct drbd_conf *mdev = w->mdev;
2928
2929         D_ASSERT(mdev->state.disk == D_FAILED);
2930         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
2931          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
2932          * the protected members anymore, though, so once put_ldev reaches zero
2933          * again, it will be safe to free them. */
2934         drbd_force_state(mdev, NS(disk, D_DISKLESS));
2935         return 1;
2936 }
2937
2938 void drbd_go_diskless(struct drbd_conf *mdev)
2939 {
2940         D_ASSERT(mdev->state.disk == D_FAILED);
2941         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
2942                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
2943 }
2944
2945 /**
2946  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
2947  * @mdev:       DRBD device.
2948  * @io_fn:      IO callback to be called when bitmap IO is possible
2949  * @done:       callback to be called after the bitmap IO was performed
2950  * @why:        Descriptive text of the reason for doing the IO
2951  *
2952  * While IO on the bitmap happens we freeze application IO thus we ensure
2953  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
2954  * called from worker context. It MUST NOT be used while a previous such
2955  * work is still pending!
2956  */
2957 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
2958                           int (*io_fn)(struct drbd_conf *),
2959                           void (*done)(struct drbd_conf *, int),
2960                           char *why, enum bm_flag flags)
2961 {
2962         D_ASSERT(current == mdev->tconn->worker.task);
2963
2964         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
2965         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
2966         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
2967         if (mdev->bm_io_work.why)
2968                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
2969                         why, mdev->bm_io_work.why);
2970
2971         mdev->bm_io_work.io_fn = io_fn;
2972         mdev->bm_io_work.done = done;
2973         mdev->bm_io_work.why = why;
2974         mdev->bm_io_work.flags = flags;
2975
2976         spin_lock_irq(&mdev->tconn->req_lock);
2977         set_bit(BITMAP_IO, &mdev->flags);
2978         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
2979                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
2980                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
2981         }
2982         spin_unlock_irq(&mdev->tconn->req_lock);
2983 }
2984
2985 /**
2986  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
2987  * @mdev:       DRBD device.
2988  * @io_fn:      IO callback to be called when bitmap IO is possible
2989  * @why:        Descriptive text of the reason for doing the IO
2990  *
2991  * freezes application IO while that the actual IO operations runs. This
2992  * functions MAY NOT be called from worker context.
2993  */
2994 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
2995                 char *why, enum bm_flag flags)
2996 {
2997         int rv;
2998
2999         D_ASSERT(current != mdev->tconn->worker.task);
3000
3001         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3002                 drbd_suspend_io(mdev);
3003
3004         drbd_bm_lock(mdev, why, flags);
3005         rv = io_fn(mdev);
3006         drbd_bm_unlock(mdev);
3007
3008         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3009                 drbd_resume_io(mdev);
3010
3011         return rv;
3012 }
3013
3014 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3015 {
3016         if ((mdev->ldev->md.flags & flag) != flag) {
3017                 drbd_md_mark_dirty(mdev);
3018                 mdev->ldev->md.flags |= flag;
3019         }
3020 }
3021
3022 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3023 {
3024         if ((mdev->ldev->md.flags & flag) != 0) {
3025                 drbd_md_mark_dirty(mdev);
3026                 mdev->ldev->md.flags &= ~flag;
3027         }
3028 }
3029 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3030 {
3031         return (bdev->md.flags & flag) != 0;
3032 }
3033
3034 static void md_sync_timer_fn(unsigned long data)
3035 {
3036         struct drbd_conf *mdev = (struct drbd_conf *) data;
3037
3038         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3039 }
3040
3041 static int w_md_sync(struct drbd_work *w, int unused)
3042 {
3043         struct drbd_conf *mdev = w->mdev;
3044
3045         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3046 #ifdef DEBUG
3047         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3048                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3049 #endif
3050         drbd_md_sync(mdev);
3051         return 1;
3052 }
3053
3054 const char *cmdname(enum drbd_packet cmd)
3055 {
3056         /* THINK may need to become several global tables
3057          * when we want to support more than
3058          * one PRO_VERSION */
3059         static const char *cmdnames[] = {
3060                 [P_DATA]                = "Data",
3061                 [P_DATA_REPLY]          = "DataReply",
3062                 [P_RS_DATA_REPLY]       = "RSDataReply",
3063                 [P_BARRIER]             = "Barrier",
3064                 [P_BITMAP]              = "ReportBitMap",
3065                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3066                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3067                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
3068                 [P_DATA_REQUEST]        = "DataRequest",
3069                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
3070                 [P_SYNC_PARAM]          = "SyncParam",
3071                 [P_SYNC_PARAM89]        = "SyncParam89",
3072                 [P_PROTOCOL]            = "ReportProtocol",
3073                 [P_UUIDS]               = "ReportUUIDs",
3074                 [P_SIZES]               = "ReportSizes",
3075                 [P_STATE]               = "ReportState",
3076                 [P_SYNC_UUID]           = "ReportSyncUUID",
3077                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3078                 [P_AUTH_RESPONSE]       = "AuthResponse",
3079                 [P_PING]                = "Ping",
3080                 [P_PING_ACK]            = "PingAck",
3081                 [P_RECV_ACK]            = "RecvAck",
3082                 [P_WRITE_ACK]           = "WriteAck",
3083                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3084                 [P_DISCARD_WRITE]        = "DiscardWrite",
3085                 [P_NEG_ACK]             = "NegAck",
3086                 [P_NEG_DREPLY]          = "NegDReply",
3087                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3088                 [P_BARRIER_ACK]         = "BarrierAck",
3089                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3090                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3091                 [P_OV_REQUEST]          = "OVRequest",
3092                 [P_OV_REPLY]            = "OVReply",
3093                 [P_OV_RESULT]           = "OVResult",
3094                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3095                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3096                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3097                 [P_DELAY_PROBE]         = "DelayProbe",
3098                 [P_OUT_OF_SYNC]         = "OutOfSync",
3099                 [P_RETRY_WRITE]         = "RetryWrite",
3100         };
3101
3102         if (cmd == P_HAND_SHAKE_M)
3103                 return "HandShakeM";
3104         if (cmd == P_HAND_SHAKE_S)
3105                 return "HandShakeS";
3106         if (cmd == P_HAND_SHAKE)
3107                 return "HandShake";
3108         if (cmd >= ARRAY_SIZE(cmdnames))
3109                 return "Unknown";
3110         return cmdnames[cmd];
3111 }
3112
3113 /**
3114  * drbd_wait_misc  -  wait for a request to make progress
3115  * @mdev:       device associated with the request
3116  * @i:          the struct drbd_interval embedded in struct drbd_request or
3117  *              struct drbd_peer_request
3118  */
3119 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3120 {
3121         struct net_conf *net_conf = mdev->tconn->net_conf;
3122         DEFINE_WAIT(wait);
3123         long timeout;
3124
3125         if (!net_conf)
3126                 return -ETIMEDOUT;
3127         timeout = MAX_SCHEDULE_TIMEOUT;
3128         if (net_conf->ko_count)
3129                 timeout = net_conf->timeout * HZ / 10 * net_conf->ko_count;
3130
3131         /* Indicate to wake up mdev->misc_wait on progress.  */
3132         i->waiting = true;
3133         prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3134         spin_unlock_irq(&mdev->tconn->req_lock);
3135         timeout = schedule_timeout(timeout);
3136         finish_wait(&mdev->misc_wait, &wait);
3137         spin_lock_irq(&mdev->tconn->req_lock);
3138         if (!timeout || mdev->state.conn < C_CONNECTED)
3139                 return -ETIMEDOUT;
3140         if (signal_pending(current))
3141                 return -ERESTARTSYS;
3142         return 0;
3143 }
3144
3145 #ifdef CONFIG_DRBD_FAULT_INJECTION
3146 /* Fault insertion support including random number generator shamelessly
3147  * stolen from kernel/rcutorture.c */
3148 struct fault_random_state {
3149         unsigned long state;
3150         unsigned long count;
3151 };
3152
3153 #define FAULT_RANDOM_MULT 39916801  /* prime */
3154 #define FAULT_RANDOM_ADD        479001701 /* prime */
3155 #define FAULT_RANDOM_REFRESH 10000
3156
3157 /*
3158  * Crude but fast random-number generator.  Uses a linear congruential
3159  * generator, with occasional help from get_random_bytes().
3160  */
3161 static unsigned long
3162 _drbd_fault_random(struct fault_random_state *rsp)
3163 {
3164         long refresh;
3165
3166         if (!rsp->count--) {
3167                 get_random_bytes(&refresh, sizeof(refresh));
3168                 rsp->state += refresh;
3169                 rsp->count = FAULT_RANDOM_REFRESH;
3170         }
3171         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3172         return swahw32(rsp->state);
3173 }
3174
3175 static char *
3176 _drbd_fault_str(unsigned int type) {
3177         static char *_faults[] = {
3178                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3179                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3180                 [DRBD_FAULT_RS_WR] = "Resync write",
3181                 [DRBD_FAULT_RS_RD] = "Resync read",
3182                 [DRBD_FAULT_DT_WR] = "Data write",
3183                 [DRBD_FAULT_DT_RD] = "Data read",
3184                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3185                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3186                 [DRBD_FAULT_AL_EE] = "EE allocation",
3187                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3188         };
3189
3190         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3191 }
3192
3193 unsigned int
3194 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3195 {
3196         static struct fault_random_state rrs = {0, 0};
3197
3198         unsigned int ret = (
3199                 (fault_devs == 0 ||
3200                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3201                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3202
3203         if (ret) {
3204                 fault_count++;
3205
3206                 if (__ratelimit(&drbd_ratelimit_state))
3207                         dev_warn(DEV, "***Simulating %s failure\n",
3208                                 _drbd_fault_str(type));
3209         }
3210
3211         return ret;
3212 }
3213 #endif
3214
3215 const char *drbd_buildtag(void)
3216 {
3217         /* DRBD built from external sources has here a reference to the
3218            git hash of the source code. */
3219
3220         static char buildtag[38] = "\0uilt-in";
3221
3222         if (buildtag[0] == 0) {
3223 #ifdef CONFIG_MODULES
3224                 if (THIS_MODULE != NULL)
3225                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3226                 else
3227 #endif
3228                         buildtag[0] = 'b';
3229         }
3230
3231         return buildtag;
3232 }
3233
3234 module_init(drbd_init)
3235 module_exit(drbd_cleanup)
3236
3237 EXPORT_SYMBOL(drbd_conn_str);
3238 EXPORT_SYMBOL(drbd_role_str);
3239 EXPORT_SYMBOL(drbd_disk_str);
3240 EXPORT_SYMBOL(drbd_set_st_err_str);