]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_main.c
drbd: Removing drbd_cfg_rwsem
[karo-tx-linux.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details;       /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113  * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119  * as member "struct gendisk *vdisk;"
120  */
121 struct idr minors;
122 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
123
124 struct kmem_cache *drbd_request_cache;
125 struct kmem_cache *drbd_ee_cache;       /* peer requests */
126 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
127 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
128 mempool_t *drbd_request_mempool;
129 mempool_t *drbd_ee_mempool;
130 mempool_t *drbd_md_io_page_pool;
131 struct bio_set *drbd_md_io_bio_set;
132
133 /* I do not use a standard mempool, because:
134    1) I want to hand out the pre-allocated objects first.
135    2) I want to be able to interrupt sleeping allocation with a signal.
136    Note: This is a single linked list, the next pointer is the private
137          member of struct page.
138  */
139 struct page *drbd_pp_pool;
140 spinlock_t   drbd_pp_lock;
141 int          drbd_pp_vacant;
142 wait_queue_head_t drbd_pp_wait;
143
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
145
146 static const struct block_device_operations drbd_ops = {
147         .owner =   THIS_MODULE,
148         .open =    drbd_open,
149         .release = drbd_release,
150 };
151
152 static void bio_destructor_drbd(struct bio *bio)
153 {
154         bio_free(bio, drbd_md_io_bio_set);
155 }
156
157 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
158 {
159         struct bio *bio;
160
161         if (!drbd_md_io_bio_set)
162                 return bio_alloc(gfp_mask, 1);
163
164         bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
165         if (!bio)
166                 return NULL;
167         bio->bi_destructor = bio_destructor_drbd;
168         return bio;
169 }
170
171 #ifdef __CHECKER__
172 /* When checking with sparse, and this is an inline function, sparse will
173    give tons of false positives. When this is a real functions sparse works.
174  */
175 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
176 {
177         int io_allowed;
178
179         atomic_inc(&mdev->local_cnt);
180         io_allowed = (mdev->state.disk >= mins);
181         if (!io_allowed) {
182                 if (atomic_dec_and_test(&mdev->local_cnt))
183                         wake_up(&mdev->misc_wait);
184         }
185         return io_allowed;
186 }
187
188 #endif
189
190 /**
191  * DOC: The transfer log
192  *
193  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
194  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
195  * of the list. There is always at least one &struct drbd_tl_epoch object.
196  *
197  * Each &struct drbd_tl_epoch has a circular double linked list of requests
198  * attached.
199  */
200 static int tl_init(struct drbd_tconn *tconn)
201 {
202         struct drbd_tl_epoch *b;
203
204         /* during device minor initialization, we may well use GFP_KERNEL */
205         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
206         if (!b)
207                 return 0;
208         INIT_LIST_HEAD(&b->requests);
209         INIT_LIST_HEAD(&b->w.list);
210         b->next = NULL;
211         b->br_number = 4711;
212         b->n_writes = 0;
213         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
214
215         tconn->oldest_tle = b;
216         tconn->newest_tle = b;
217         INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
218
219         return 1;
220 }
221
222 static void tl_cleanup(struct drbd_tconn *tconn)
223 {
224         if (tconn->oldest_tle != tconn->newest_tle)
225                 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
226         if (!list_empty(&tconn->out_of_sequence_requests))
227                 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
228         kfree(tconn->oldest_tle);
229         tconn->oldest_tle = NULL;
230         kfree(tconn->unused_spare_tle);
231         tconn->unused_spare_tle = NULL;
232 }
233
234 /**
235  * _tl_add_barrier() - Adds a barrier to the transfer log
236  * @mdev:       DRBD device.
237  * @new:        Barrier to be added before the current head of the TL.
238  *
239  * The caller must hold the req_lock.
240  */
241 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
242 {
243         struct drbd_tl_epoch *newest_before;
244
245         INIT_LIST_HEAD(&new->requests);
246         INIT_LIST_HEAD(&new->w.list);
247         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
248         new->next = NULL;
249         new->n_writes = 0;
250
251         newest_before = tconn->newest_tle;
252         /* never send a barrier number == 0, because that is special-cased
253          * when using TCQ for our write ordering code */
254         new->br_number = (newest_before->br_number+1) ?: 1;
255         if (tconn->newest_tle != new) {
256                 tconn->newest_tle->next = new;
257                 tconn->newest_tle = new;
258         }
259 }
260
261 /**
262  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
263  * @mdev:       DRBD device.
264  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
265  * @set_size:   Expected number of requests before that barrier.
266  *
267  * In case the passed barrier_nr or set_size does not match the oldest
268  * &struct drbd_tl_epoch objects this function will cause a termination
269  * of the connection.
270  */
271 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
272                 unsigned int set_size)
273 {
274         struct drbd_conf *mdev;
275         struct drbd_tl_epoch *b, *nob; /* next old barrier */
276         struct list_head *le, *tle;
277         struct drbd_request *r;
278
279         spin_lock_irq(&tconn->req_lock);
280
281         b = tconn->oldest_tle;
282
283         /* first some paranoia code */
284         if (b == NULL) {
285                 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
286                          barrier_nr);
287                 goto bail;
288         }
289         if (b->br_number != barrier_nr) {
290                 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
291                          barrier_nr, b->br_number);
292                 goto bail;
293         }
294         if (b->n_writes != set_size) {
295                 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
296                          barrier_nr, set_size, b->n_writes);
297                 goto bail;
298         }
299
300         /* Clean up list of requests processed during current epoch */
301         list_for_each_safe(le, tle, &b->requests) {
302                 r = list_entry(le, struct drbd_request, tl_requests);
303                 _req_mod(r, BARRIER_ACKED);
304         }
305         /* There could be requests on the list waiting for completion
306            of the write to the local disk. To avoid corruptions of
307            slab's data structures we have to remove the lists head.
308
309            Also there could have been a barrier ack out of sequence, overtaking
310            the write acks - which would be a bug and violating write ordering.
311            To not deadlock in case we lose connection while such requests are
312            still pending, we need some way to find them for the
313            _req_mode(CONNECTION_LOST_WHILE_PENDING).
314
315            These have been list_move'd to the out_of_sequence_requests list in
316            _req_mod(, BARRIER_ACKED) above.
317            */
318         list_del_init(&b->requests);
319         mdev = b->w.mdev;
320
321         nob = b->next;
322         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
323                 _tl_add_barrier(tconn, b);
324                 if (nob)
325                         tconn->oldest_tle = nob;
326                 /* if nob == NULL b was the only barrier, and becomes the new
327                    barrier. Therefore tconn->oldest_tle points already to b */
328         } else {
329                 D_ASSERT(nob != NULL);
330                 tconn->oldest_tle = nob;
331                 kfree(b);
332         }
333
334         spin_unlock_irq(&tconn->req_lock);
335         dec_ap_pending(mdev);
336
337         return;
338
339 bail:
340         spin_unlock_irq(&tconn->req_lock);
341         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
342 }
343
344
345 /**
346  * _tl_restart() - Walks the transfer log, and applies an action to all requests
347  * @mdev:       DRBD device.
348  * @what:       The action/event to perform with all request objects
349  *
350  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
351  * RESTART_FROZEN_DISK_IO.
352  */
353 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
354 {
355         struct drbd_tl_epoch *b, *tmp, **pn;
356         struct list_head *le, *tle, carry_reads;
357         struct drbd_request *req;
358         int rv, n_writes, n_reads;
359
360         b = tconn->oldest_tle;
361         pn = &tconn->oldest_tle;
362         while (b) {
363                 n_writes = 0;
364                 n_reads = 0;
365                 INIT_LIST_HEAD(&carry_reads);
366                 list_for_each_safe(le, tle, &b->requests) {
367                         req = list_entry(le, struct drbd_request, tl_requests);
368                         rv = _req_mod(req, what);
369
370                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
371                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
372                 }
373                 tmp = b->next;
374
375                 if (n_writes) {
376                         if (what == RESEND) {
377                                 b->n_writes = n_writes;
378                                 if (b->w.cb == NULL) {
379                                         b->w.cb = w_send_barrier;
380                                         inc_ap_pending(b->w.mdev);
381                                         set_bit(CREATE_BARRIER, &b->w.mdev->flags);
382                                 }
383
384                                 drbd_queue_work(&tconn->data.work, &b->w);
385                         }
386                         pn = &b->next;
387                 } else {
388                         if (n_reads)
389                                 list_add(&carry_reads, &b->requests);
390                         /* there could still be requests on that ring list,
391                          * in case local io is still pending */
392                         list_del(&b->requests);
393
394                         /* dec_ap_pending corresponding to queue_barrier.
395                          * the newest barrier may not have been queued yet,
396                          * in which case w.cb is still NULL. */
397                         if (b->w.cb != NULL)
398                                 dec_ap_pending(b->w.mdev);
399
400                         if (b == tconn->newest_tle) {
401                                 /* recycle, but reinit! */
402                                 if (tmp != NULL)
403                                         conn_err(tconn, "ASSERT FAILED tmp == NULL");
404                                 INIT_LIST_HEAD(&b->requests);
405                                 list_splice(&carry_reads, &b->requests);
406                                 INIT_LIST_HEAD(&b->w.list);
407                                 b->w.cb = NULL;
408                                 b->br_number = net_random();
409                                 b->n_writes = 0;
410
411                                 *pn = b;
412                                 break;
413                         }
414                         *pn = tmp;
415                         kfree(b);
416                 }
417                 b = tmp;
418                 list_splice(&carry_reads, &b->requests);
419         }
420 }
421
422
423 /**
424  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
425  * @mdev:       DRBD device.
426  *
427  * This is called after the connection to the peer was lost. The storage covered
428  * by the requests on the transfer gets marked as our of sync. Called from the
429  * receiver thread and the worker thread.
430  */
431 void tl_clear(struct drbd_tconn *tconn)
432 {
433         struct drbd_conf *mdev;
434         struct list_head *le, *tle;
435         struct drbd_request *r;
436         int vnr;
437
438         spin_lock_irq(&tconn->req_lock);
439
440         _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
441
442         /* we expect this list to be empty. */
443         if (!list_empty(&tconn->out_of_sequence_requests))
444                 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
445
446         /* but just in case, clean it up anyways! */
447         list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
448                 r = list_entry(le, struct drbd_request, tl_requests);
449                 /* It would be nice to complete outside of spinlock.
450                  * But this is easier for now. */
451                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
452         }
453
454         /* ensure bit indicating barrier is required is clear */
455         rcu_read_lock();
456         idr_for_each_entry(&tconn->volumes, mdev, vnr)
457                 clear_bit(CREATE_BARRIER, &mdev->flags);
458         rcu_read_unlock();
459
460         spin_unlock_irq(&tconn->req_lock);
461 }
462
463 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
464 {
465         spin_lock_irq(&tconn->req_lock);
466         _tl_restart(tconn, what);
467         spin_unlock_irq(&tconn->req_lock);
468 }
469
470 static int drbd_thread_setup(void *arg)
471 {
472         struct drbd_thread *thi = (struct drbd_thread *) arg;
473         struct drbd_tconn *tconn = thi->tconn;
474         unsigned long flags;
475         int retval;
476
477         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
478                  thi->name[0], thi->tconn->name);
479
480 restart:
481         retval = thi->function(thi);
482
483         spin_lock_irqsave(&thi->t_lock, flags);
484
485         /* if the receiver has been "EXITING", the last thing it did
486          * was set the conn state to "StandAlone",
487          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
488          * and receiver thread will be "started".
489          * drbd_thread_start needs to set "RESTARTING" in that case.
490          * t_state check and assignment needs to be within the same spinlock,
491          * so either thread_start sees EXITING, and can remap to RESTARTING,
492          * or thread_start see NONE, and can proceed as normal.
493          */
494
495         if (thi->t_state == RESTARTING) {
496                 conn_info(tconn, "Restarting %s thread\n", thi->name);
497                 thi->t_state = RUNNING;
498                 spin_unlock_irqrestore(&thi->t_lock, flags);
499                 goto restart;
500         }
501
502         thi->task = NULL;
503         thi->t_state = NONE;
504         smp_mb();
505         complete_all(&thi->stop);
506         spin_unlock_irqrestore(&thi->t_lock, flags);
507
508         conn_info(tconn, "Terminating %s\n", current->comm);
509
510         /* Release mod reference taken when thread was started */
511
512         kref_put(&tconn->kref, &conn_destroy);
513         module_put(THIS_MODULE);
514         return retval;
515 }
516
517 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
518                              int (*func) (struct drbd_thread *), char *name)
519 {
520         spin_lock_init(&thi->t_lock);
521         thi->task    = NULL;
522         thi->t_state = NONE;
523         thi->function = func;
524         thi->tconn = tconn;
525         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
526 }
527
528 int drbd_thread_start(struct drbd_thread *thi)
529 {
530         struct drbd_tconn *tconn = thi->tconn;
531         struct task_struct *nt;
532         unsigned long flags;
533
534         /* is used from state engine doing drbd_thread_stop_nowait,
535          * while holding the req lock irqsave */
536         spin_lock_irqsave(&thi->t_lock, flags);
537
538         switch (thi->t_state) {
539         case NONE:
540                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
541                          thi->name, current->comm, current->pid);
542
543                 /* Get ref on module for thread - this is released when thread exits */
544                 if (!try_module_get(THIS_MODULE)) {
545                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
546                         spin_unlock_irqrestore(&thi->t_lock, flags);
547                         return false;
548                 }
549
550                 kref_get(&thi->tconn->kref);
551
552                 init_completion(&thi->stop);
553                 thi->reset_cpu_mask = 1;
554                 thi->t_state = RUNNING;
555                 spin_unlock_irqrestore(&thi->t_lock, flags);
556                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
557
558                 nt = kthread_create(drbd_thread_setup, (void *) thi,
559                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
560
561                 if (IS_ERR(nt)) {
562                         conn_err(tconn, "Couldn't start thread\n");
563
564                         kref_put(&tconn->kref, &conn_destroy);
565                         module_put(THIS_MODULE);
566                         return false;
567                 }
568                 spin_lock_irqsave(&thi->t_lock, flags);
569                 thi->task = nt;
570                 thi->t_state = RUNNING;
571                 spin_unlock_irqrestore(&thi->t_lock, flags);
572                 wake_up_process(nt);
573                 break;
574         case EXITING:
575                 thi->t_state = RESTARTING;
576                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
577                                 thi->name, current->comm, current->pid);
578                 /* fall through */
579         case RUNNING:
580         case RESTARTING:
581         default:
582                 spin_unlock_irqrestore(&thi->t_lock, flags);
583                 break;
584         }
585
586         return true;
587 }
588
589
590 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
591 {
592         unsigned long flags;
593
594         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
595
596         /* may be called from state engine, holding the req lock irqsave */
597         spin_lock_irqsave(&thi->t_lock, flags);
598
599         if (thi->t_state == NONE) {
600                 spin_unlock_irqrestore(&thi->t_lock, flags);
601                 if (restart)
602                         drbd_thread_start(thi);
603                 return;
604         }
605
606         if (thi->t_state != ns) {
607                 if (thi->task == NULL) {
608                         spin_unlock_irqrestore(&thi->t_lock, flags);
609                         return;
610                 }
611
612                 thi->t_state = ns;
613                 smp_mb();
614                 init_completion(&thi->stop);
615                 if (thi->task != current)
616                         force_sig(DRBD_SIGKILL, thi->task);
617         }
618
619         spin_unlock_irqrestore(&thi->t_lock, flags);
620
621         if (wait)
622                 wait_for_completion(&thi->stop);
623 }
624
625 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
626 {
627         struct drbd_thread *thi =
628                 task == tconn->receiver.task ? &tconn->receiver :
629                 task == tconn->asender.task  ? &tconn->asender :
630                 task == tconn->worker.task   ? &tconn->worker : NULL;
631
632         return thi;
633 }
634
635 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
636 {
637         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
638         return thi ? thi->name : task->comm;
639 }
640
641 int conn_lowest_minor(struct drbd_tconn *tconn)
642 {
643         struct drbd_conf *mdev;
644         int vnr = 0, m;
645
646         rcu_read_lock();
647         mdev = idr_get_next(&tconn->volumes, &vnr);
648         m = mdev ? mdev_to_minor(mdev) : -1;
649         rcu_read_unlock();
650
651         return m;
652 }
653
654 #ifdef CONFIG_SMP
655 /**
656  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
657  * @mdev:       DRBD device.
658  *
659  * Forces all threads of a device onto the same CPU. This is beneficial for
660  * DRBD's performance. May be overwritten by user's configuration.
661  */
662 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
663 {
664         int ord, cpu;
665
666         /* user override. */
667         if (cpumask_weight(tconn->cpu_mask))
668                 return;
669
670         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
671         for_each_online_cpu(cpu) {
672                 if (ord-- == 0) {
673                         cpumask_set_cpu(cpu, tconn->cpu_mask);
674                         return;
675                 }
676         }
677         /* should not be reached */
678         cpumask_setall(tconn->cpu_mask);
679 }
680
681 /**
682  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
683  * @mdev:       DRBD device.
684  * @thi:        drbd_thread object
685  *
686  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
687  * prematurely.
688  */
689 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
690 {
691         struct task_struct *p = current;
692
693         if (!thi->reset_cpu_mask)
694                 return;
695         thi->reset_cpu_mask = 0;
696         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
697 }
698 #endif
699
700 /**
701  * drbd_header_size  -  size of a packet header
702  *
703  * The header size is a multiple of 8, so any payload following the header is
704  * word aligned on 64-bit architectures.  (The bitmap send and receive code
705  * relies on this.)
706  */
707 unsigned int drbd_header_size(struct drbd_tconn *tconn)
708 {
709         if (tconn->agreed_pro_version >= 100) {
710                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
711                 return sizeof(struct p_header100);
712         } else {
713                 BUILD_BUG_ON(sizeof(struct p_header80) !=
714                              sizeof(struct p_header95));
715                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
716                 return sizeof(struct p_header80);
717         }
718 }
719
720 static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
721 {
722         h->magic   = cpu_to_be32(DRBD_MAGIC);
723         h->command = cpu_to_be16(cmd);
724         h->length  = cpu_to_be16(size);
725         return sizeof(struct p_header80);
726 }
727
728 static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
729 {
730         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
731         h->command = cpu_to_be16(cmd);
732         h->length = cpu_to_be32(size);
733         return sizeof(struct p_header95);
734 }
735
736 static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
737                                       int size, int vnr)
738 {
739         h->magic = cpu_to_be32(DRBD_MAGIC_100);
740         h->volume = cpu_to_be16(vnr);
741         h->command = cpu_to_be16(cmd);
742         h->length = cpu_to_be32(size);
743         h->pad = 0;
744         return sizeof(struct p_header100);
745 }
746
747 static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
748                                    void *buffer, enum drbd_packet cmd, int size)
749 {
750         if (tconn->agreed_pro_version >= 100)
751                 return prepare_header100(buffer, cmd, size, vnr);
752         else if (tconn->agreed_pro_version >= 95 &&
753                  size > DRBD_MAX_SIZE_H80_PACKET)
754                 return prepare_header95(buffer, cmd, size);
755         else
756                 return prepare_header80(buffer, cmd, size);
757 }
758
759 static void *__conn_prepare_command(struct drbd_tconn *tconn,
760                                     struct drbd_socket *sock)
761 {
762         if (!sock->socket)
763                 return NULL;
764         return sock->sbuf + drbd_header_size(tconn);
765 }
766
767 void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
768 {
769         void *p;
770
771         mutex_lock(&sock->mutex);
772         p = __conn_prepare_command(tconn, sock);
773         if (!p)
774                 mutex_unlock(&sock->mutex);
775
776         return p;
777 }
778
779 void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
780 {
781         return conn_prepare_command(mdev->tconn, sock);
782 }
783
784 static int __send_command(struct drbd_tconn *tconn, int vnr,
785                           struct drbd_socket *sock, enum drbd_packet cmd,
786                           unsigned int header_size, void *data,
787                           unsigned int size)
788 {
789         int msg_flags;
790         int err;
791
792         /*
793          * Called with @data == NULL and the size of the data blocks in @size
794          * for commands that send data blocks.  For those commands, omit the
795          * MSG_MORE flag: this will increase the likelihood that data blocks
796          * which are page aligned on the sender will end up page aligned on the
797          * receiver.
798          */
799         msg_flags = data ? MSG_MORE : 0;
800
801         header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
802                                       header_size + size);
803         err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
804                             msg_flags);
805         if (data && !err)
806                 err = drbd_send_all(tconn, sock->socket, data, size, 0);
807         return err;
808 }
809
810 static int __conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
811                                enum drbd_packet cmd, unsigned int header_size,
812                                void *data, unsigned int size)
813 {
814         return __send_command(tconn, 0, sock, cmd, header_size, data, size);
815 }
816
817 int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
818                       enum drbd_packet cmd, unsigned int header_size,
819                       void *data, unsigned int size)
820 {
821         int err;
822
823         err = __conn_send_command(tconn, sock, cmd, header_size, data, size);
824         mutex_unlock(&sock->mutex);
825         return err;
826 }
827
828 int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
829                       enum drbd_packet cmd, unsigned int header_size,
830                       void *data, unsigned int size)
831 {
832         int err;
833
834         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
835                              data, size);
836         mutex_unlock(&sock->mutex);
837         return err;
838 }
839
840 int drbd_send_ping(struct drbd_tconn *tconn)
841 {
842         struct drbd_socket *sock;
843
844         sock = &tconn->meta;
845         if (!conn_prepare_command(tconn, sock))
846                 return -EIO;
847         return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
848 }
849
850 int drbd_send_ping_ack(struct drbd_tconn *tconn)
851 {
852         struct drbd_socket *sock;
853
854         sock = &tconn->meta;
855         if (!conn_prepare_command(tconn, sock))
856                 return -EIO;
857         return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
858 }
859
860 int drbd_send_sync_param(struct drbd_conf *mdev)
861 {
862         struct drbd_socket *sock;
863         struct p_rs_param_95 *p;
864         int size;
865         const int apv = mdev->tconn->agreed_pro_version;
866         enum drbd_packet cmd;
867         struct net_conf *nc;
868         struct disk_conf *dc;
869
870         sock = &mdev->tconn->data;
871         p = drbd_prepare_command(mdev, sock);
872         if (!p)
873                 return -EIO;
874
875         rcu_read_lock();
876         nc = rcu_dereference(mdev->tconn->net_conf);
877
878         size = apv <= 87 ? sizeof(struct p_rs_param)
879                 : apv == 88 ? sizeof(struct p_rs_param)
880                         + strlen(nc->verify_alg) + 1
881                 : apv <= 94 ? sizeof(struct p_rs_param_89)
882                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
883
884         cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
885
886         /* initialize verify_alg and csums_alg */
887         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
888
889         if (get_ldev(mdev)) {
890                 dc = rcu_dereference(mdev->ldev->disk_conf);
891                 p->rate = cpu_to_be32(dc->resync_rate);
892                 p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
893                 p->c_delay_target = cpu_to_be32(dc->c_delay_target);
894                 p->c_fill_target = cpu_to_be32(dc->c_fill_target);
895                 p->c_max_rate = cpu_to_be32(dc->c_max_rate);
896                 put_ldev(mdev);
897         } else {
898                 p->rate = cpu_to_be32(DRBD_RATE_DEF);
899                 p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
900                 p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
901                 p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
902                 p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
903         }
904
905         if (apv >= 88)
906                 strcpy(p->verify_alg, nc->verify_alg);
907         if (apv >= 89)
908                 strcpy(p->csums_alg, nc->csums_alg);
909         rcu_read_unlock();
910
911         return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
912 }
913
914 int __drbd_send_protocol(struct drbd_tconn *tconn)
915 {
916         struct drbd_socket *sock;
917         struct p_protocol *p;
918         struct net_conf *nc;
919         int size, cf;
920
921         sock = &tconn->data;
922         p = __conn_prepare_command(tconn, sock);
923         if (!p)
924                 return -EIO;
925
926         rcu_read_lock();
927         nc = rcu_dereference(tconn->net_conf);
928
929         if (nc->dry_run && tconn->agreed_pro_version < 92) {
930                 rcu_read_unlock();
931                 mutex_unlock(&sock->mutex);
932                 conn_err(tconn, "--dry-run is not supported by peer");
933                 return -EOPNOTSUPP;
934         }
935
936         size = sizeof(*p);
937         if (tconn->agreed_pro_version >= 87)
938                 size += strlen(nc->integrity_alg) + 1;
939
940         p->protocol      = cpu_to_be32(nc->wire_protocol);
941         p->after_sb_0p   = cpu_to_be32(nc->after_sb_0p);
942         p->after_sb_1p   = cpu_to_be32(nc->after_sb_1p);
943         p->after_sb_2p   = cpu_to_be32(nc->after_sb_2p);
944         p->two_primaries = cpu_to_be32(nc->two_primaries);
945         cf = 0;
946         if (nc->want_lose)
947                 cf |= CF_WANT_LOSE;
948         if (nc->dry_run)
949                 cf |= CF_DRY_RUN;
950         p->conn_flags    = cpu_to_be32(cf);
951
952         if (tconn->agreed_pro_version >= 87)
953                 strcpy(p->integrity_alg, nc->integrity_alg);
954         rcu_read_unlock();
955
956         return __conn_send_command(tconn, sock, P_PROTOCOL, size, NULL, 0);
957 }
958
959 int drbd_send_protocol(struct drbd_tconn *tconn)
960 {
961         int err;
962
963         mutex_lock(&tconn->data.mutex);
964         err = __drbd_send_protocol(tconn);
965         mutex_unlock(&tconn->data.mutex);
966
967         return err;
968 }
969
970 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
971 {
972         struct drbd_socket *sock;
973         struct p_uuids *p;
974         int i;
975
976         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
977                 return 0;
978
979         sock = &mdev->tconn->data;
980         p = drbd_prepare_command(mdev, sock);
981         if (!p) {
982                 put_ldev(mdev);
983                 return -EIO;
984         }
985         for (i = UI_CURRENT; i < UI_SIZE; i++)
986                 p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
987
988         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
989         p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
990         rcu_read_lock();
991         uuid_flags |= rcu_dereference(mdev->tconn->net_conf)->want_lose ? 1 : 0;
992         rcu_read_unlock();
993         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
994         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
995         p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
996
997         put_ldev(mdev);
998         return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
999 }
1000
1001 int drbd_send_uuids(struct drbd_conf *mdev)
1002 {
1003         return _drbd_send_uuids(mdev, 0);
1004 }
1005
1006 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1007 {
1008         return _drbd_send_uuids(mdev, 8);
1009 }
1010
1011 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
1012 {
1013         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1014                 u64 *uuid = mdev->ldev->md.uuid;
1015                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
1016                      text,
1017                      (unsigned long long)uuid[UI_CURRENT],
1018                      (unsigned long long)uuid[UI_BITMAP],
1019                      (unsigned long long)uuid[UI_HISTORY_START],
1020                      (unsigned long long)uuid[UI_HISTORY_END]);
1021                 put_ldev(mdev);
1022         } else {
1023                 dev_info(DEV, "%s effective data uuid: %016llX\n",
1024                                 text,
1025                                 (unsigned long long)mdev->ed_uuid);
1026         }
1027 }
1028
1029 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
1030 {
1031         struct drbd_socket *sock;
1032         struct p_rs_uuid *p;
1033         u64 uuid;
1034
1035         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1036
1037         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
1038         drbd_uuid_set(mdev, UI_BITMAP, uuid);
1039         drbd_print_uuids(mdev, "updated sync UUID");
1040         drbd_md_sync(mdev);
1041
1042         sock = &mdev->tconn->data;
1043         p = drbd_prepare_command(mdev, sock);
1044         if (p) {
1045                 p->uuid = cpu_to_be64(uuid);
1046                 drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
1047         }
1048 }
1049
1050 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1051 {
1052         struct drbd_socket *sock;
1053         struct p_sizes *p;
1054         sector_t d_size, u_size;
1055         int q_order_type, max_bio_size;
1056
1057         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1058                 D_ASSERT(mdev->ldev->backing_bdev);
1059                 d_size = drbd_get_max_capacity(mdev->ldev);
1060                 rcu_read_lock();
1061                 u_size = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
1062                 rcu_read_unlock();
1063                 q_order_type = drbd_queue_order_type(mdev);
1064                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
1065                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
1066                 put_ldev(mdev);
1067         } else {
1068                 d_size = 0;
1069                 u_size = 0;
1070                 q_order_type = QUEUE_ORDERED_NONE;
1071                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
1072         }
1073
1074         sock = &mdev->tconn->data;
1075         p = drbd_prepare_command(mdev, sock);
1076         if (!p)
1077                 return -EIO;
1078         p->d_size = cpu_to_be64(d_size);
1079         p->u_size = cpu_to_be64(u_size);
1080         p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1081         p->max_bio_size = cpu_to_be32(max_bio_size);
1082         p->queue_order_type = cpu_to_be16(q_order_type);
1083         p->dds_flags = cpu_to_be16(flags);
1084         return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
1085 }
1086
1087 /**
1088  * drbd_send_state() - Sends the drbd state to the peer
1089  * @mdev:       DRBD device.
1090  */
1091 int drbd_send_state(struct drbd_conf *mdev)
1092 {
1093         struct drbd_socket *sock;
1094         struct p_state *p;
1095
1096         sock = &mdev->tconn->data;
1097         p = drbd_prepare_command(mdev, sock);
1098         if (!p)
1099                 return -EIO;
1100         p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1101         return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1102 }
1103
1104 int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
1105 {
1106         struct drbd_socket *sock;
1107         struct p_req_state *p;
1108
1109         sock = &mdev->tconn->data;
1110         p = drbd_prepare_command(mdev, sock);
1111         if (!p)
1112                 return -EIO;
1113         p->mask = cpu_to_be32(mask.i);
1114         p->val = cpu_to_be32(val.i);
1115         return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1116
1117 }
1118
1119 int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
1120 {
1121         enum drbd_packet cmd;
1122         struct drbd_socket *sock;
1123         struct p_req_state *p;
1124
1125         cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1126         sock = &tconn->data;
1127         p = conn_prepare_command(tconn, sock);
1128         if (!p)
1129                 return -EIO;
1130         p->mask = cpu_to_be32(mask.i);
1131         p->val = cpu_to_be32(val.i);
1132         return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1133 }
1134
1135 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1136 {
1137         struct drbd_socket *sock;
1138         struct p_req_state_reply *p;
1139
1140         sock = &mdev->tconn->meta;
1141         p = drbd_prepare_command(mdev, sock);
1142         if (p) {
1143                 p->retcode = cpu_to_be32(retcode);
1144                 drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1145         }
1146 }
1147
1148 void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1149 {
1150         struct drbd_socket *sock;
1151         struct p_req_state_reply *p;
1152         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1153
1154         sock = &tconn->meta;
1155         p = conn_prepare_command(tconn, sock);
1156         if (p) {
1157                 p->retcode = cpu_to_be32(retcode);
1158                 conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1159         }
1160 }
1161
1162 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1163 {
1164         BUG_ON(code & ~0xf);
1165         p->encoding = (p->encoding & ~0xf) | code;
1166 }
1167
1168 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1169 {
1170         p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1171 }
1172
1173 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1174 {
1175         BUG_ON(n & ~0x7);
1176         p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1177 }
1178
1179 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1180                          struct p_compressed_bm *p,
1181                          unsigned int size,
1182                          struct bm_xfer_ctx *c)
1183 {
1184         struct bitstream bs;
1185         unsigned long plain_bits;
1186         unsigned long tmp;
1187         unsigned long rl;
1188         unsigned len;
1189         unsigned toggle;
1190         int bits, use_rle;
1191
1192         /* may we use this feature? */
1193         rcu_read_lock();
1194         use_rle = rcu_dereference(mdev->tconn->net_conf)->use_rle;
1195         rcu_read_unlock();
1196         if (!use_rle || mdev->tconn->agreed_pro_version < 90)
1197                 return 0;
1198
1199         if (c->bit_offset >= c->bm_bits)
1200                 return 0; /* nothing to do. */
1201
1202         /* use at most thus many bytes */
1203         bitstream_init(&bs, p->code, size, 0);
1204         memset(p->code, 0, size);
1205         /* plain bits covered in this code string */
1206         plain_bits = 0;
1207
1208         /* p->encoding & 0x80 stores whether the first run length is set.
1209          * bit offset is implicit.
1210          * start with toggle == 2 to be able to tell the first iteration */
1211         toggle = 2;
1212
1213         /* see how much plain bits we can stuff into one packet
1214          * using RLE and VLI. */
1215         do {
1216                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1217                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1218                 if (tmp == -1UL)
1219                         tmp = c->bm_bits;
1220                 rl = tmp - c->bit_offset;
1221
1222                 if (toggle == 2) { /* first iteration */
1223                         if (rl == 0) {
1224                                 /* the first checked bit was set,
1225                                  * store start value, */
1226                                 dcbp_set_start(p, 1);
1227                                 /* but skip encoding of zero run length */
1228                                 toggle = !toggle;
1229                                 continue;
1230                         }
1231                         dcbp_set_start(p, 0);
1232                 }
1233
1234                 /* paranoia: catch zero runlength.
1235                  * can only happen if bitmap is modified while we scan it. */
1236                 if (rl == 0) {
1237                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1238                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1239                         return -1;
1240                 }
1241
1242                 bits = vli_encode_bits(&bs, rl);
1243                 if (bits == -ENOBUFS) /* buffer full */
1244                         break;
1245                 if (bits <= 0) {
1246                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1247                         return 0;
1248                 }
1249
1250                 toggle = !toggle;
1251                 plain_bits += rl;
1252                 c->bit_offset = tmp;
1253         } while (c->bit_offset < c->bm_bits);
1254
1255         len = bs.cur.b - p->code + !!bs.cur.bit;
1256
1257         if (plain_bits < (len << 3)) {
1258                 /* incompressible with this method.
1259                  * we need to rewind both word and bit position. */
1260                 c->bit_offset -= plain_bits;
1261                 bm_xfer_ctx_bit_to_word_offset(c);
1262                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1263                 return 0;
1264         }
1265
1266         /* RLE + VLI was able to compress it just fine.
1267          * update c->word_offset. */
1268         bm_xfer_ctx_bit_to_word_offset(c);
1269
1270         /* store pad_bits */
1271         dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1272
1273         return len;
1274 }
1275
1276 /**
1277  * send_bitmap_rle_or_plain
1278  *
1279  * Return 0 when done, 1 when another iteration is needed, and a negative error
1280  * code upon failure.
1281  */
1282 static int
1283 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1284 {
1285         struct drbd_socket *sock = &mdev->tconn->data;
1286         unsigned int header_size = drbd_header_size(mdev->tconn);
1287         struct p_compressed_bm *p = sock->sbuf + header_size;
1288         int len, err;
1289
1290         len = fill_bitmap_rle_bits(mdev, p,
1291                         DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1292         if (len < 0)
1293                 return -EIO;
1294
1295         if (len) {
1296                 dcbp_set_code(p, RLE_VLI_Bits);
1297                 err = __send_command(mdev->tconn, mdev->vnr, sock,
1298                                      P_COMPRESSED_BITMAP, sizeof(*p) + len,
1299                                      NULL, 0);
1300                 c->packets[0]++;
1301                 c->bytes[0] += header_size + sizeof(*p) + len;
1302
1303                 if (c->bit_offset >= c->bm_bits)
1304                         len = 0; /* DONE */
1305         } else {
1306                 /* was not compressible.
1307                  * send a buffer full of plain text bits instead. */
1308                 unsigned int data_size;
1309                 unsigned long num_words;
1310                 unsigned long *p = sock->sbuf + header_size;
1311
1312                 data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1313                 num_words = min_t(size_t, data_size / sizeof(*p),
1314                                   c->bm_words - c->word_offset);
1315                 len = num_words * sizeof(*p);
1316                 if (len)
1317                         drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
1318                 err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
1319                 c->word_offset += num_words;
1320                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1321
1322                 c->packets[1]++;
1323                 c->bytes[1] += header_size + len;
1324
1325                 if (c->bit_offset > c->bm_bits)
1326                         c->bit_offset = c->bm_bits;
1327         }
1328         if (!err) {
1329                 if (len == 0) {
1330                         INFO_bm_xfer_stats(mdev, "send", c);
1331                         return 0;
1332                 } else
1333                         return 1;
1334         }
1335         return -EIO;
1336 }
1337
1338 /* See the comment at receive_bitmap() */
1339 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1340 {
1341         struct bm_xfer_ctx c;
1342         int err;
1343
1344         if (!expect(mdev->bitmap))
1345                 return false;
1346
1347         if (get_ldev(mdev)) {
1348                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1349                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1350                         drbd_bm_set_all(mdev);
1351                         if (drbd_bm_write(mdev)) {
1352                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1353                                  * but otherwise process as per normal - need to tell other
1354                                  * side that a full resync is required! */
1355                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1356                         } else {
1357                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1358                                 drbd_md_sync(mdev);
1359                         }
1360                 }
1361                 put_ldev(mdev);
1362         }
1363
1364         c = (struct bm_xfer_ctx) {
1365                 .bm_bits = drbd_bm_bits(mdev),
1366                 .bm_words = drbd_bm_words(mdev),
1367         };
1368
1369         do {
1370                 err = send_bitmap_rle_or_plain(mdev, &c);
1371         } while (err > 0);
1372
1373         return err == 0;
1374 }
1375
1376 int drbd_send_bitmap(struct drbd_conf *mdev)
1377 {
1378         struct drbd_socket *sock = &mdev->tconn->data;
1379         int err = -1;
1380
1381         mutex_lock(&sock->mutex);
1382         if (sock->socket)
1383                 err = !_drbd_send_bitmap(mdev);
1384         mutex_unlock(&sock->mutex);
1385         return err;
1386 }
1387
1388 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1389 {
1390         struct drbd_socket *sock;
1391         struct p_barrier_ack *p;
1392
1393         if (mdev->state.conn < C_CONNECTED)
1394                 return;
1395
1396         sock = &mdev->tconn->meta;
1397         p = drbd_prepare_command(mdev, sock);
1398         if (!p)
1399                 return;
1400         p->barrier = barrier_nr;
1401         p->set_size = cpu_to_be32(set_size);
1402         drbd_send_command(mdev, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1403 }
1404
1405 /**
1406  * _drbd_send_ack() - Sends an ack packet
1407  * @mdev:       DRBD device.
1408  * @cmd:        Packet command code.
1409  * @sector:     sector, needs to be in big endian byte order
1410  * @blksize:    size in byte, needs to be in big endian byte order
1411  * @block_id:   Id, big endian byte order
1412  */
1413 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1414                           u64 sector, u32 blksize, u64 block_id)
1415 {
1416         struct drbd_socket *sock;
1417         struct p_block_ack *p;
1418
1419         if (mdev->state.conn < C_CONNECTED)
1420                 return -EIO;
1421
1422         sock = &mdev->tconn->meta;
1423         p = drbd_prepare_command(mdev, sock);
1424         if (!p)
1425                 return -EIO;
1426         p->sector = sector;
1427         p->block_id = block_id;
1428         p->blksize = blksize;
1429         p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1430         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1431 }
1432
1433 /* dp->sector and dp->block_id already/still in network byte order,
1434  * data_size is payload size according to dp->head,
1435  * and may need to be corrected for digest size. */
1436 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1437                       struct p_data *dp, int data_size)
1438 {
1439         if (mdev->tconn->peer_integrity_tfm)
1440                 data_size -= crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1441         _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1442                        dp->block_id);
1443 }
1444
1445 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1446                       struct p_block_req *rp)
1447 {
1448         _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1449 }
1450
1451 /**
1452  * drbd_send_ack() - Sends an ack packet
1453  * @mdev:       DRBD device
1454  * @cmd:        packet command code
1455  * @peer_req:   peer request
1456  */
1457 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1458                   struct drbd_peer_request *peer_req)
1459 {
1460         return _drbd_send_ack(mdev, cmd,
1461                               cpu_to_be64(peer_req->i.sector),
1462                               cpu_to_be32(peer_req->i.size),
1463                               peer_req->block_id);
1464 }
1465
1466 /* This function misuses the block_id field to signal if the blocks
1467  * are is sync or not. */
1468 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1469                      sector_t sector, int blksize, u64 block_id)
1470 {
1471         return _drbd_send_ack(mdev, cmd,
1472                               cpu_to_be64(sector),
1473                               cpu_to_be32(blksize),
1474                               cpu_to_be64(block_id));
1475 }
1476
1477 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1478                        sector_t sector, int size, u64 block_id)
1479 {
1480         struct drbd_socket *sock;
1481         struct p_block_req *p;
1482
1483         sock = &mdev->tconn->data;
1484         p = drbd_prepare_command(mdev, sock);
1485         if (!p)
1486                 return -EIO;
1487         p->sector = cpu_to_be64(sector);
1488         p->block_id = block_id;
1489         p->blksize = cpu_to_be32(size);
1490         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1491 }
1492
1493 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1494                             void *digest, int digest_size, enum drbd_packet cmd)
1495 {
1496         struct drbd_socket *sock;
1497         struct p_block_req *p;
1498
1499         /* FIXME: Put the digest into the preallocated socket buffer.  */
1500
1501         sock = &mdev->tconn->data;
1502         p = drbd_prepare_command(mdev, sock);
1503         if (!p)
1504                 return -EIO;
1505         p->sector = cpu_to_be64(sector);
1506         p->block_id = ID_SYNCER /* unused */;
1507         p->blksize = cpu_to_be32(size);
1508         return drbd_send_command(mdev, sock, cmd, sizeof(*p),
1509                                  digest, digest_size);
1510 }
1511
1512 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1513 {
1514         struct drbd_socket *sock;
1515         struct p_block_req *p;
1516
1517         sock = &mdev->tconn->data;
1518         p = drbd_prepare_command(mdev, sock);
1519         if (!p)
1520                 return -EIO;
1521         p->sector = cpu_to_be64(sector);
1522         p->block_id = ID_SYNCER /* unused */;
1523         p->blksize = cpu_to_be32(size);
1524         return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1525 }
1526
1527 /* called on sndtimeo
1528  * returns false if we should retry,
1529  * true if we think connection is dead
1530  */
1531 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1532 {
1533         int drop_it;
1534         /* long elapsed = (long)(jiffies - mdev->last_received); */
1535
1536         drop_it =   tconn->meta.socket == sock
1537                 || !tconn->asender.task
1538                 || get_t_state(&tconn->asender) != RUNNING
1539                 || tconn->cstate < C_WF_REPORT_PARAMS;
1540
1541         if (drop_it)
1542                 return true;
1543
1544         drop_it = !--tconn->ko_count;
1545         if (!drop_it) {
1546                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1547                          current->comm, current->pid, tconn->ko_count);
1548                 request_ping(tconn);
1549         }
1550
1551         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1552 }
1553
1554 static void drbd_update_congested(struct drbd_tconn *tconn)
1555 {
1556         struct sock *sk = tconn->data.socket->sk;
1557         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1558                 set_bit(NET_CONGESTED, &tconn->flags);
1559 }
1560
1561 /* The idea of sendpage seems to be to put some kind of reference
1562  * to the page into the skb, and to hand it over to the NIC. In
1563  * this process get_page() gets called.
1564  *
1565  * As soon as the page was really sent over the network put_page()
1566  * gets called by some part of the network layer. [ NIC driver? ]
1567  *
1568  * [ get_page() / put_page() increment/decrement the count. If count
1569  *   reaches 0 the page will be freed. ]
1570  *
1571  * This works nicely with pages from FSs.
1572  * But this means that in protocol A we might signal IO completion too early!
1573  *
1574  * In order not to corrupt data during a resync we must make sure
1575  * that we do not reuse our own buffer pages (EEs) to early, therefore
1576  * we have the net_ee list.
1577  *
1578  * XFS seems to have problems, still, it submits pages with page_count == 0!
1579  * As a workaround, we disable sendpage on pages
1580  * with page_count == 0 or PageSlab.
1581  */
1582 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1583                               int offset, size_t size, unsigned msg_flags)
1584 {
1585         struct socket *socket;
1586         void *addr;
1587         int err;
1588
1589         socket = mdev->tconn->data.socket;
1590         addr = kmap(page) + offset;
1591         err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1592         kunmap(page);
1593         if (!err)
1594                 mdev->send_cnt += size >> 9;
1595         return err;
1596 }
1597
1598 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1599                     int offset, size_t size, unsigned msg_flags)
1600 {
1601         struct socket *socket = mdev->tconn->data.socket;
1602         mm_segment_t oldfs = get_fs();
1603         int len = size;
1604         int err = -EIO;
1605
1606         /* e.g. XFS meta- & log-data is in slab pages, which have a
1607          * page_count of 0 and/or have PageSlab() set.
1608          * we cannot use send_page for those, as that does get_page();
1609          * put_page(); and would cause either a VM_BUG directly, or
1610          * __page_cache_release a page that would actually still be referenced
1611          * by someone, leading to some obscure delayed Oops somewhere else. */
1612         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1613                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1614
1615         msg_flags |= MSG_NOSIGNAL;
1616         drbd_update_congested(mdev->tconn);
1617         set_fs(KERNEL_DS);
1618         do {
1619                 int sent;
1620
1621                 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1622                 if (sent <= 0) {
1623                         if (sent == -EAGAIN) {
1624                                 if (we_should_drop_the_connection(mdev->tconn, socket))
1625                                         break;
1626                                 continue;
1627                         }
1628                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1629                              __func__, (int)size, len, sent);
1630                         if (sent < 0)
1631                                 err = sent;
1632                         break;
1633                 }
1634                 len    -= sent;
1635                 offset += sent;
1636         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1637         set_fs(oldfs);
1638         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1639
1640         if (len == 0) {
1641                 err = 0;
1642                 mdev->send_cnt += size >> 9;
1643         }
1644         return err;
1645 }
1646
1647 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1648 {
1649         struct bio_vec *bvec;
1650         int i;
1651         /* hint all but last page with MSG_MORE */
1652         __bio_for_each_segment(bvec, bio, i, 0) {
1653                 int err;
1654
1655                 err = _drbd_no_send_page(mdev, bvec->bv_page,
1656                                          bvec->bv_offset, bvec->bv_len,
1657                                          i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1658                 if (err)
1659                         return err;
1660         }
1661         return 0;
1662 }
1663
1664 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1665 {
1666         struct bio_vec *bvec;
1667         int i;
1668         /* hint all but last page with MSG_MORE */
1669         __bio_for_each_segment(bvec, bio, i, 0) {
1670                 int err;
1671
1672                 err = _drbd_send_page(mdev, bvec->bv_page,
1673                                       bvec->bv_offset, bvec->bv_len,
1674                                       i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1675                 if (err)
1676                         return err;
1677         }
1678         return 0;
1679 }
1680
1681 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1682                             struct drbd_peer_request *peer_req)
1683 {
1684         struct page *page = peer_req->pages;
1685         unsigned len = peer_req->i.size;
1686         int err;
1687
1688         /* hint all but last page with MSG_MORE */
1689         page_chain_for_each(page) {
1690                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1691
1692                 err = _drbd_send_page(mdev, page, 0, l,
1693                                       page_chain_next(page) ? MSG_MORE : 0);
1694                 if (err)
1695                         return err;
1696                 len -= l;
1697         }
1698         return 0;
1699 }
1700
1701 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1702 {
1703         if (mdev->tconn->agreed_pro_version >= 95)
1704                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1705                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1706                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1707                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1708         else
1709                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1710 }
1711
1712 /* Used to send write requests
1713  * R_PRIMARY -> Peer    (P_DATA)
1714  */
1715 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1716 {
1717         struct drbd_socket *sock;
1718         struct p_data *p;
1719         unsigned int dp_flags = 0;
1720         int dgs;
1721         int err;
1722
1723         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1724                 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1725
1726         sock = &mdev->tconn->data;
1727         p = drbd_prepare_command(mdev, sock);
1728         if (!p)
1729                 return -EIO;
1730         p->sector = cpu_to_be64(req->i.sector);
1731         p->block_id = (unsigned long)req;
1732         p->seq_num = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1733         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1734         if (mdev->state.conn >= C_SYNC_SOURCE &&
1735             mdev->state.conn <= C_PAUSED_SYNC_T)
1736                 dp_flags |= DP_MAY_SET_IN_SYNC;
1737         if (mdev->tconn->agreed_pro_version >= 100) {
1738                 if (req->rq_state & RQ_EXP_RECEIVE_ACK)
1739                         dp_flags |= DP_SEND_RECEIVE_ACK;
1740                 if (req->rq_state & RQ_EXP_WRITE_ACK)
1741                         dp_flags |= DP_SEND_WRITE_ACK;
1742         }
1743         p->dp_flags = cpu_to_be32(dp_flags);
1744         if (dgs)
1745                 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, p + 1);
1746         err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
1747         if (!err) {
1748                 /* For protocol A, we have to memcpy the payload into
1749                  * socket buffers, as we may complete right away
1750                  * as soon as we handed it over to tcp, at which point the data
1751                  * pages may become invalid.
1752                  *
1753                  * For data-integrity enabled, we copy it as well, so we can be
1754                  * sure that even if the bio pages may still be modified, it
1755                  * won't change the data on the wire, thus if the digest checks
1756                  * out ok after sending on this side, but does not fit on the
1757                  * receiving side, we sure have detected corruption elsewhere.
1758                  */
1759                 if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || dgs)
1760                         err = _drbd_send_bio(mdev, req->master_bio);
1761                 else
1762                         err = _drbd_send_zc_bio(mdev, req->master_bio);
1763
1764                 /* double check digest, sometimes buffers have been modified in flight. */
1765                 if (dgs > 0 && dgs <= 64) {
1766                         /* 64 byte, 512 bit, is the largest digest size
1767                          * currently supported in kernel crypto. */
1768                         unsigned char digest[64];
1769                         drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, digest);
1770                         if (memcmp(p + 1, digest, dgs)) {
1771                                 dev_warn(DEV,
1772                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1773                                         (unsigned long long)req->i.sector, req->i.size);
1774                         }
1775                 } /* else if (dgs > 64) {
1776                      ... Be noisy about digest too large ...
1777                 } */
1778         }
1779         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1780
1781         return err;
1782 }
1783
1784 /* answer packet, used to send data back for read requests:
1785  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1786  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1787  */
1788 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1789                     struct drbd_peer_request *peer_req)
1790 {
1791         struct drbd_socket *sock;
1792         struct p_data *p;
1793         int err;
1794         int dgs;
1795
1796         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1797                 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1798
1799         sock = &mdev->tconn->data;
1800         p = drbd_prepare_command(mdev, sock);
1801         if (!p)
1802                 return -EIO;
1803         p->sector = cpu_to_be64(peer_req->i.sector);
1804         p->block_id = peer_req->block_id;
1805         p->seq_num = 0;  /* unused */
1806         if (dgs)
1807                 drbd_csum_ee(mdev, mdev->tconn->integrity_tfm, peer_req, p + 1);
1808         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
1809         if (!err)
1810                 err = _drbd_send_zc_ee(mdev, peer_req);
1811         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1812
1813         return err;
1814 }
1815
1816 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1817 {
1818         struct drbd_socket *sock;
1819         struct p_block_desc *p;
1820
1821         sock = &mdev->tconn->data;
1822         p = drbd_prepare_command(mdev, sock);
1823         if (!p)
1824                 return -EIO;
1825         p->sector = cpu_to_be64(req->i.sector);
1826         p->blksize = cpu_to_be32(req->i.size);
1827         return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1828 }
1829
1830 /*
1831   drbd_send distinguishes two cases:
1832
1833   Packets sent via the data socket "sock"
1834   and packets sent via the meta data socket "msock"
1835
1836                     sock                      msock
1837   -----------------+-------------------------+------------------------------
1838   timeout           conf.timeout / 2          conf.timeout / 2
1839   timeout action    send a ping via msock     Abort communication
1840                                               and close all sockets
1841 */
1842
1843 /*
1844  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1845  */
1846 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1847               void *buf, size_t size, unsigned msg_flags)
1848 {
1849         struct kvec iov;
1850         struct msghdr msg;
1851         int rv, sent = 0;
1852
1853         if (!sock)
1854                 return -EBADR;
1855
1856         /* THINK  if (signal_pending) return ... ? */
1857
1858         iov.iov_base = buf;
1859         iov.iov_len  = size;
1860
1861         msg.msg_name       = NULL;
1862         msg.msg_namelen    = 0;
1863         msg.msg_control    = NULL;
1864         msg.msg_controllen = 0;
1865         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1866
1867         if (sock == tconn->data.socket) {
1868                 rcu_read_lock();
1869                 tconn->ko_count = rcu_dereference(tconn->net_conf)->ko_count;
1870                 rcu_read_unlock();
1871                 drbd_update_congested(tconn);
1872         }
1873         do {
1874                 /* STRANGE
1875                  * tcp_sendmsg does _not_ use its size parameter at all ?
1876                  *
1877                  * -EAGAIN on timeout, -EINTR on signal.
1878                  */
1879 /* THINK
1880  * do we need to block DRBD_SIG if sock == &meta.socket ??
1881  * otherwise wake_asender() might interrupt some send_*Ack !
1882  */
1883                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1884                 if (rv == -EAGAIN) {
1885                         if (we_should_drop_the_connection(tconn, sock))
1886                                 break;
1887                         else
1888                                 continue;
1889                 }
1890                 if (rv == -EINTR) {
1891                         flush_signals(current);
1892                         rv = 0;
1893                 }
1894                 if (rv < 0)
1895                         break;
1896                 sent += rv;
1897                 iov.iov_base += rv;
1898                 iov.iov_len  -= rv;
1899         } while (sent < size);
1900
1901         if (sock == tconn->data.socket)
1902                 clear_bit(NET_CONGESTED, &tconn->flags);
1903
1904         if (rv <= 0) {
1905                 if (rv != -EAGAIN) {
1906                         conn_err(tconn, "%s_sendmsg returned %d\n",
1907                                  sock == tconn->meta.socket ? "msock" : "sock",
1908                                  rv);
1909                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1910                 } else
1911                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1912         }
1913
1914         return sent;
1915 }
1916
1917 /**
1918  * drbd_send_all  -  Send an entire buffer
1919  *
1920  * Returns 0 upon success and a negative error value otherwise.
1921  */
1922 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
1923                   size_t size, unsigned msg_flags)
1924 {
1925         int err;
1926
1927         err = drbd_send(tconn, sock, buffer, size, msg_flags);
1928         if (err < 0)
1929                 return err;
1930         if (err != size)
1931                 return -EIO;
1932         return 0;
1933 }
1934
1935 static int drbd_open(struct block_device *bdev, fmode_t mode)
1936 {
1937         struct drbd_conf *mdev = bdev->bd_disk->private_data;
1938         unsigned long flags;
1939         int rv = 0;
1940
1941         mutex_lock(&drbd_main_mutex);
1942         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1943         /* to have a stable mdev->state.role
1944          * and no race with updating open_cnt */
1945
1946         if (mdev->state.role != R_PRIMARY) {
1947                 if (mode & FMODE_WRITE)
1948                         rv = -EROFS;
1949                 else if (!allow_oos)
1950                         rv = -EMEDIUMTYPE;
1951         }
1952
1953         if (!rv)
1954                 mdev->open_cnt++;
1955         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1956         mutex_unlock(&drbd_main_mutex);
1957
1958         return rv;
1959 }
1960
1961 static int drbd_release(struct gendisk *gd, fmode_t mode)
1962 {
1963         struct drbd_conf *mdev = gd->private_data;
1964         mutex_lock(&drbd_main_mutex);
1965         mdev->open_cnt--;
1966         mutex_unlock(&drbd_main_mutex);
1967         return 0;
1968 }
1969
1970 static void drbd_set_defaults(struct drbd_conf *mdev)
1971 {
1972         /* Beware! The actual layout differs
1973          * between big endian and little endian */
1974         mdev->state = (union drbd_dev_state) {
1975                 { .role = R_SECONDARY,
1976                   .peer = R_UNKNOWN,
1977                   .conn = C_STANDALONE,
1978                   .disk = D_DISKLESS,
1979                   .pdsk = D_UNKNOWN,
1980                 } };
1981 }
1982
1983 void drbd_init_set_defaults(struct drbd_conf *mdev)
1984 {
1985         /* the memset(,0,) did most of this.
1986          * note: only assignments, no allocation in here */
1987
1988         drbd_set_defaults(mdev);
1989
1990         atomic_set(&mdev->ap_bio_cnt, 0);
1991         atomic_set(&mdev->ap_pending_cnt, 0);
1992         atomic_set(&mdev->rs_pending_cnt, 0);
1993         atomic_set(&mdev->unacked_cnt, 0);
1994         atomic_set(&mdev->local_cnt, 0);
1995         atomic_set(&mdev->pp_in_use_by_net, 0);
1996         atomic_set(&mdev->rs_sect_in, 0);
1997         atomic_set(&mdev->rs_sect_ev, 0);
1998         atomic_set(&mdev->ap_in_flight, 0);
1999
2000         mutex_init(&mdev->md_io_mutex);
2001         mutex_init(&mdev->own_state_mutex);
2002         mdev->state_mutex = &mdev->own_state_mutex;
2003
2004         spin_lock_init(&mdev->al_lock);
2005         spin_lock_init(&mdev->peer_seq_lock);
2006         spin_lock_init(&mdev->epoch_lock);
2007
2008         INIT_LIST_HEAD(&mdev->active_ee);
2009         INIT_LIST_HEAD(&mdev->sync_ee);
2010         INIT_LIST_HEAD(&mdev->done_ee);
2011         INIT_LIST_HEAD(&mdev->read_ee);
2012         INIT_LIST_HEAD(&mdev->net_ee);
2013         INIT_LIST_HEAD(&mdev->resync_reads);
2014         INIT_LIST_HEAD(&mdev->resync_work.list);
2015         INIT_LIST_HEAD(&mdev->unplug_work.list);
2016         INIT_LIST_HEAD(&mdev->go_diskless.list);
2017         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2018         INIT_LIST_HEAD(&mdev->start_resync_work.list);
2019         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2020
2021         mdev->resync_work.cb  = w_resync_timer;
2022         mdev->unplug_work.cb  = w_send_write_hint;
2023         mdev->go_diskless.cb  = w_go_diskless;
2024         mdev->md_sync_work.cb = w_md_sync;
2025         mdev->bm_io_work.w.cb = w_bitmap_io;
2026         mdev->start_resync_work.cb = w_start_resync;
2027
2028         mdev->resync_work.mdev  = mdev;
2029         mdev->unplug_work.mdev  = mdev;
2030         mdev->go_diskless.mdev  = mdev;
2031         mdev->md_sync_work.mdev = mdev;
2032         mdev->bm_io_work.w.mdev = mdev;
2033         mdev->start_resync_work.mdev = mdev;
2034
2035         init_timer(&mdev->resync_timer);
2036         init_timer(&mdev->md_sync_timer);
2037         init_timer(&mdev->start_resync_timer);
2038         init_timer(&mdev->request_timer);
2039         mdev->resync_timer.function = resync_timer_fn;
2040         mdev->resync_timer.data = (unsigned long) mdev;
2041         mdev->md_sync_timer.function = md_sync_timer_fn;
2042         mdev->md_sync_timer.data = (unsigned long) mdev;
2043         mdev->start_resync_timer.function = start_resync_timer_fn;
2044         mdev->start_resync_timer.data = (unsigned long) mdev;
2045         mdev->request_timer.function = request_timer_fn;
2046         mdev->request_timer.data = (unsigned long) mdev;
2047
2048         init_waitqueue_head(&mdev->misc_wait);
2049         init_waitqueue_head(&mdev->state_wait);
2050         init_waitqueue_head(&mdev->ee_wait);
2051         init_waitqueue_head(&mdev->al_wait);
2052         init_waitqueue_head(&mdev->seq_wait);
2053
2054         mdev->write_ordering = WO_bdev_flush;
2055         mdev->resync_wenr = LC_FREE;
2056         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2057         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2058 }
2059
2060 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2061 {
2062         int i;
2063         if (mdev->tconn->receiver.t_state != NONE)
2064                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2065                                 mdev->tconn->receiver.t_state);
2066
2067         /* no need to lock it, I'm the only thread alive */
2068         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2069                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2070         mdev->al_writ_cnt  =
2071         mdev->bm_writ_cnt  =
2072         mdev->read_cnt     =
2073         mdev->recv_cnt     =
2074         mdev->send_cnt     =
2075         mdev->writ_cnt     =
2076         mdev->p_size       =
2077         mdev->rs_start     =
2078         mdev->rs_total     =
2079         mdev->rs_failed    = 0;
2080         mdev->rs_last_events = 0;
2081         mdev->rs_last_sect_ev = 0;
2082         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2083                 mdev->rs_mark_left[i] = 0;
2084                 mdev->rs_mark_time[i] = 0;
2085         }
2086         D_ASSERT(mdev->tconn->net_conf == NULL);
2087
2088         drbd_set_my_capacity(mdev, 0);
2089         if (mdev->bitmap) {
2090                 /* maybe never allocated. */
2091                 drbd_bm_resize(mdev, 0, 1);
2092                 drbd_bm_cleanup(mdev);
2093         }
2094
2095         drbd_free_bc(mdev->ldev);
2096         mdev->ldev = NULL;
2097
2098         clear_bit(AL_SUSPENDED, &mdev->flags);
2099
2100         D_ASSERT(list_empty(&mdev->active_ee));
2101         D_ASSERT(list_empty(&mdev->sync_ee));
2102         D_ASSERT(list_empty(&mdev->done_ee));
2103         D_ASSERT(list_empty(&mdev->read_ee));
2104         D_ASSERT(list_empty(&mdev->net_ee));
2105         D_ASSERT(list_empty(&mdev->resync_reads));
2106         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2107         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
2108         D_ASSERT(list_empty(&mdev->resync_work.list));
2109         D_ASSERT(list_empty(&mdev->unplug_work.list));
2110         D_ASSERT(list_empty(&mdev->go_diskless.list));
2111
2112         drbd_set_defaults(mdev);
2113 }
2114
2115
2116 static void drbd_destroy_mempools(void)
2117 {
2118         struct page *page;
2119
2120         while (drbd_pp_pool) {
2121                 page = drbd_pp_pool;
2122                 drbd_pp_pool = (struct page *)page_private(page);
2123                 __free_page(page);
2124                 drbd_pp_vacant--;
2125         }
2126
2127         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2128
2129         if (drbd_md_io_bio_set)
2130                 bioset_free(drbd_md_io_bio_set);
2131         if (drbd_md_io_page_pool)
2132                 mempool_destroy(drbd_md_io_page_pool);
2133         if (drbd_ee_mempool)
2134                 mempool_destroy(drbd_ee_mempool);
2135         if (drbd_request_mempool)
2136                 mempool_destroy(drbd_request_mempool);
2137         if (drbd_ee_cache)
2138                 kmem_cache_destroy(drbd_ee_cache);
2139         if (drbd_request_cache)
2140                 kmem_cache_destroy(drbd_request_cache);
2141         if (drbd_bm_ext_cache)
2142                 kmem_cache_destroy(drbd_bm_ext_cache);
2143         if (drbd_al_ext_cache)
2144                 kmem_cache_destroy(drbd_al_ext_cache);
2145
2146         drbd_md_io_bio_set   = NULL;
2147         drbd_md_io_page_pool = NULL;
2148         drbd_ee_mempool      = NULL;
2149         drbd_request_mempool = NULL;
2150         drbd_ee_cache        = NULL;
2151         drbd_request_cache   = NULL;
2152         drbd_bm_ext_cache    = NULL;
2153         drbd_al_ext_cache    = NULL;
2154
2155         return;
2156 }
2157
2158 static int drbd_create_mempools(void)
2159 {
2160         struct page *page;
2161         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2162         int i;
2163
2164         /* prepare our caches and mempools */
2165         drbd_request_mempool = NULL;
2166         drbd_ee_cache        = NULL;
2167         drbd_request_cache   = NULL;
2168         drbd_bm_ext_cache    = NULL;
2169         drbd_al_ext_cache    = NULL;
2170         drbd_pp_pool         = NULL;
2171         drbd_md_io_page_pool = NULL;
2172         drbd_md_io_bio_set   = NULL;
2173
2174         /* caches */
2175         drbd_request_cache = kmem_cache_create(
2176                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2177         if (drbd_request_cache == NULL)
2178                 goto Enomem;
2179
2180         drbd_ee_cache = kmem_cache_create(
2181                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2182         if (drbd_ee_cache == NULL)
2183                 goto Enomem;
2184
2185         drbd_bm_ext_cache = kmem_cache_create(
2186                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2187         if (drbd_bm_ext_cache == NULL)
2188                 goto Enomem;
2189
2190         drbd_al_ext_cache = kmem_cache_create(
2191                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2192         if (drbd_al_ext_cache == NULL)
2193                 goto Enomem;
2194
2195         /* mempools */
2196         drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2197         if (drbd_md_io_bio_set == NULL)
2198                 goto Enomem;
2199
2200         drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2201         if (drbd_md_io_page_pool == NULL)
2202                 goto Enomem;
2203
2204         drbd_request_mempool = mempool_create(number,
2205                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2206         if (drbd_request_mempool == NULL)
2207                 goto Enomem;
2208
2209         drbd_ee_mempool = mempool_create(number,
2210                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2211         if (drbd_ee_mempool == NULL)
2212                 goto Enomem;
2213
2214         /* drbd's page pool */
2215         spin_lock_init(&drbd_pp_lock);
2216
2217         for (i = 0; i < number; i++) {
2218                 page = alloc_page(GFP_HIGHUSER);
2219                 if (!page)
2220                         goto Enomem;
2221                 set_page_private(page, (unsigned long)drbd_pp_pool);
2222                 drbd_pp_pool = page;
2223         }
2224         drbd_pp_vacant = number;
2225
2226         return 0;
2227
2228 Enomem:
2229         drbd_destroy_mempools(); /* in case we allocated some */
2230         return -ENOMEM;
2231 }
2232
2233 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2234         void *unused)
2235 {
2236         /* just so we have it.  you never know what interesting things we
2237          * might want to do here some day...
2238          */
2239
2240         return NOTIFY_DONE;
2241 }
2242
2243 static struct notifier_block drbd_notifier = {
2244         .notifier_call = drbd_notify_sys,
2245 };
2246
2247 static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
2248 {
2249         int rr;
2250
2251         rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
2252         if (rr)
2253                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2254
2255         rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
2256         if (rr)
2257                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2258
2259         rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
2260         if (rr)
2261                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2262
2263         rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
2264         if (rr)
2265                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2266
2267         rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
2268         if (rr)
2269                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2270 }
2271
2272 /* caution. no locking. */
2273 void drbd_minor_destroy(struct kref *kref)
2274 {
2275         struct drbd_conf *mdev = container_of(kref, struct drbd_conf, kref);
2276         struct drbd_tconn *tconn = mdev->tconn;
2277
2278         /* paranoia asserts */
2279         D_ASSERT(mdev->open_cnt == 0);
2280         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2281         /* end paranoia asserts */
2282
2283         /* cleanup stuff that may have been allocated during
2284          * device (re-)configuration or state changes */
2285
2286         if (mdev->this_bdev)
2287                 bdput(mdev->this_bdev);
2288
2289         drbd_free_bc(mdev->ldev);
2290         mdev->ldev = NULL;
2291
2292         drbd_release_all_peer_reqs(mdev);
2293
2294         lc_destroy(mdev->act_log);
2295         lc_destroy(mdev->resync);
2296
2297         kfree(mdev->p_uuid);
2298         /* mdev->p_uuid = NULL; */
2299
2300         kfree(mdev->current_epoch);
2301         if (mdev->bitmap) /* should no longer be there. */
2302                 drbd_bm_cleanup(mdev);
2303         __free_page(mdev->md_io_page);
2304         put_disk(mdev->vdisk);
2305         blk_cleanup_queue(mdev->rq_queue);
2306         kfree(mdev->rs_plan_s);
2307         kfree(mdev);
2308
2309         kref_put(&tconn->kref, &conn_destroy);
2310 }
2311
2312 static void drbd_cleanup(void)
2313 {
2314         unsigned int i;
2315         struct drbd_conf *mdev;
2316         struct drbd_tconn *tconn, *tmp;
2317
2318         unregister_reboot_notifier(&drbd_notifier);
2319
2320         /* first remove proc,
2321          * drbdsetup uses it's presence to detect
2322          * whether DRBD is loaded.
2323          * If we would get stuck in proc removal,
2324          * but have netlink already deregistered,
2325          * some drbdsetup commands may wait forever
2326          * for an answer.
2327          */
2328         if (drbd_proc)
2329                 remove_proc_entry("drbd", NULL);
2330
2331         drbd_genl_unregister();
2332
2333         idr_for_each_entry(&minors, mdev, i) {
2334                 idr_remove(&minors, mdev_to_minor(mdev));
2335                 idr_remove(&mdev->tconn->volumes, mdev->vnr);
2336                 del_gendisk(mdev->vdisk);
2337                 /* synchronize_rcu(); No other threads running at this point */
2338                 kref_put(&mdev->kref, &drbd_minor_destroy);
2339         }
2340
2341         /* not _rcu since, no other updater anymore. Genl already unregistered */
2342         list_for_each_entry_safe(tconn, tmp, &drbd_tconns, all_tconn) {
2343                 list_del(&tconn->all_tconn); /* not _rcu no proc, not other threads */
2344                 /* synchronize_rcu(); */
2345                 kref_put(&tconn->kref, &conn_destroy);
2346         }
2347
2348         drbd_destroy_mempools();
2349         unregister_blkdev(DRBD_MAJOR, "drbd");
2350
2351         idr_destroy(&minors);
2352
2353         printk(KERN_INFO "drbd: module cleanup done.\n");
2354 }
2355
2356 /**
2357  * drbd_congested() - Callback for pdflush
2358  * @congested_data:     User data
2359  * @bdi_bits:           Bits pdflush is currently interested in
2360  *
2361  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2362  */
2363 static int drbd_congested(void *congested_data, int bdi_bits)
2364 {
2365         struct drbd_conf *mdev = congested_data;
2366         struct request_queue *q;
2367         char reason = '-';
2368         int r = 0;
2369
2370         if (!may_inc_ap_bio(mdev)) {
2371                 /* DRBD has frozen IO */
2372                 r = bdi_bits;
2373                 reason = 'd';
2374                 goto out;
2375         }
2376
2377         if (get_ldev(mdev)) {
2378                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2379                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2380                 put_ldev(mdev);
2381                 if (r)
2382                         reason = 'b';
2383         }
2384
2385         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2386                 r |= (1 << BDI_async_congested);
2387                 reason = reason == 'b' ? 'a' : 'n';
2388         }
2389
2390 out:
2391         mdev->congestion_reason = reason;
2392         return r;
2393 }
2394
2395 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2396 {
2397         sema_init(&wq->s, 0);
2398         spin_lock_init(&wq->q_lock);
2399         INIT_LIST_HEAD(&wq->q);
2400 }
2401
2402 struct drbd_tconn *conn_get_by_name(const char *name)
2403 {
2404         struct drbd_tconn *tconn;
2405
2406         if (!name || !name[0])
2407                 return NULL;
2408
2409         rcu_read_lock();
2410         list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2411                 if (!strcmp(tconn->name, name)) {
2412                         kref_get(&tconn->kref);
2413                         goto found;
2414                 }
2415         }
2416         tconn = NULL;
2417 found:
2418         rcu_read_unlock();
2419         return tconn;
2420 }
2421
2422 static int drbd_alloc_socket(struct drbd_socket *socket)
2423 {
2424         socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2425         if (!socket->rbuf)
2426                 return -ENOMEM;
2427         socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2428         if (!socket->sbuf)
2429                 return -ENOMEM;
2430         return 0;
2431 }
2432
2433 static void drbd_free_socket(struct drbd_socket *socket)
2434 {
2435         free_page((unsigned long) socket->sbuf);
2436         free_page((unsigned long) socket->rbuf);
2437 }
2438
2439 void conn_free_crypto(struct drbd_tconn *tconn)
2440 {
2441         drbd_free_sock(tconn);
2442
2443         crypto_free_hash(tconn->csums_tfm);
2444         crypto_free_hash(tconn->verify_tfm);
2445         crypto_free_hash(tconn->cram_hmac_tfm);
2446         crypto_free_hash(tconn->integrity_tfm);
2447         crypto_free_hash(tconn->peer_integrity_tfm);
2448         kfree(tconn->int_dig_in);
2449         kfree(tconn->int_dig_vv);
2450
2451         tconn->csums_tfm = NULL;
2452         tconn->verify_tfm = NULL;
2453         tconn->cram_hmac_tfm = NULL;
2454         tconn->integrity_tfm = NULL;
2455         tconn->peer_integrity_tfm = NULL;
2456         tconn->int_dig_in = NULL;
2457         tconn->int_dig_vv = NULL;
2458 }
2459
2460 /* caller must be under genl_lock() */
2461 struct drbd_tconn *conn_create(const char *name)
2462 {
2463         struct drbd_tconn *tconn;
2464
2465         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2466         if (!tconn)
2467                 return NULL;
2468
2469         tconn->name = kstrdup(name, GFP_KERNEL);
2470         if (!tconn->name)
2471                 goto fail;
2472
2473         if (drbd_alloc_socket(&tconn->data))
2474                 goto fail;
2475         if (drbd_alloc_socket(&tconn->meta))
2476                 goto fail;
2477
2478         if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2479                 goto fail;
2480
2481         if (!tl_init(tconn))
2482                 goto fail;
2483
2484         tconn->cstate = C_STANDALONE;
2485         mutex_init(&tconn->cstate_mutex);
2486         spin_lock_init(&tconn->req_lock);
2487         mutex_init(&tconn->conf_update);
2488         init_waitqueue_head(&tconn->ping_wait);
2489         idr_init(&tconn->volumes);
2490
2491         drbd_init_workqueue(&tconn->data.work);
2492         mutex_init(&tconn->data.mutex);
2493
2494         drbd_init_workqueue(&tconn->meta.work);
2495         mutex_init(&tconn->meta.mutex);
2496
2497         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2498         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2499         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2500
2501         drbd_set_res_opts_defaults(&tconn->res_opts);
2502
2503         kref_init(&tconn->kref);
2504         list_add_tail_rcu(&tconn->all_tconn, &drbd_tconns);
2505
2506         return tconn;
2507
2508 fail:
2509         tl_cleanup(tconn);
2510         free_cpumask_var(tconn->cpu_mask);
2511         drbd_free_socket(&tconn->meta);
2512         drbd_free_socket(&tconn->data);
2513         kfree(tconn->name);
2514         kfree(tconn);
2515
2516         return NULL;
2517 }
2518
2519 void conn_destroy(struct kref *kref)
2520 {
2521         struct drbd_tconn *tconn = container_of(kref, struct drbd_tconn, kref);
2522
2523         idr_destroy(&tconn->volumes);
2524
2525         free_cpumask_var(tconn->cpu_mask);
2526         drbd_free_socket(&tconn->meta);
2527         drbd_free_socket(&tconn->data);
2528         kfree(tconn->name);
2529         kfree(tconn->int_dig_in);
2530         kfree(tconn->int_dig_vv);
2531         kfree(tconn);
2532 }
2533
2534 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2535 {
2536         struct drbd_conf *mdev;
2537         struct gendisk *disk;
2538         struct request_queue *q;
2539         int vnr_got = vnr;
2540         int minor_got = minor;
2541         enum drbd_ret_code err = ERR_NOMEM;
2542
2543         mdev = minor_to_mdev(minor);
2544         if (mdev)
2545                 return ERR_MINOR_EXISTS;
2546
2547         /* GFP_KERNEL, we are outside of all write-out paths */
2548         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2549         if (!mdev)
2550                 return ERR_NOMEM;
2551
2552         kref_get(&tconn->kref);
2553         mdev->tconn = tconn;
2554
2555         mdev->minor = minor;
2556         mdev->vnr = vnr;
2557
2558         drbd_init_set_defaults(mdev);
2559
2560         q = blk_alloc_queue(GFP_KERNEL);
2561         if (!q)
2562                 goto out_no_q;
2563         mdev->rq_queue = q;
2564         q->queuedata   = mdev;
2565
2566         disk = alloc_disk(1);
2567         if (!disk)
2568                 goto out_no_disk;
2569         mdev->vdisk = disk;
2570
2571         set_disk_ro(disk, true);
2572
2573         disk->queue = q;
2574         disk->major = DRBD_MAJOR;
2575         disk->first_minor = minor;
2576         disk->fops = &drbd_ops;
2577         sprintf(disk->disk_name, "drbd%d", minor);
2578         disk->private_data = mdev;
2579
2580         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2581         /* we have no partitions. we contain only ourselves. */
2582         mdev->this_bdev->bd_contains = mdev->this_bdev;
2583
2584         q->backing_dev_info.congested_fn = drbd_congested;
2585         q->backing_dev_info.congested_data = mdev;
2586
2587         blk_queue_make_request(q, drbd_make_request);
2588         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2589            This triggers a max_bio_size message upon first attach or connect */
2590         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2591         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2592         blk_queue_merge_bvec(q, drbd_merge_bvec);
2593         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2594
2595         mdev->md_io_page = alloc_page(GFP_KERNEL);
2596         if (!mdev->md_io_page)
2597                 goto out_no_io_page;
2598
2599         if (drbd_bm_init(mdev))
2600                 goto out_no_bitmap;
2601         mdev->read_requests = RB_ROOT;
2602         mdev->write_requests = RB_ROOT;
2603
2604         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2605         if (!mdev->current_epoch)
2606                 goto out_no_epoch;
2607
2608         INIT_LIST_HEAD(&mdev->current_epoch->list);
2609         mdev->epochs = 1;
2610
2611         if (!idr_pre_get(&minors, GFP_KERNEL))
2612                 goto out_no_minor_idr;
2613         if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2614                 goto out_no_minor_idr;
2615         if (minor_got != minor) {
2616                 err = ERR_MINOR_EXISTS;
2617                 drbd_msg_put_info("requested minor exists already");
2618                 goto out_idr_remove_minor;
2619         }
2620
2621         if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2622                 goto out_idr_remove_minor;
2623         if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2624                 goto out_idr_remove_minor;
2625         if (vnr_got != vnr) {
2626                 err = ERR_INVALID_REQUEST;
2627                 drbd_msg_put_info("requested volume exists already");
2628                 goto out_idr_remove_vol;
2629         }
2630         add_disk(disk);
2631         kref_init(&mdev->kref); /* one ref for both idrs and the the add_disk */
2632
2633         /* inherit the connection state */
2634         mdev->state.conn = tconn->cstate;
2635         if (mdev->state.conn == C_WF_REPORT_PARAMS)
2636                 drbd_connected(mdev);
2637
2638         return NO_ERROR;
2639
2640 out_idr_remove_vol:
2641         idr_remove(&tconn->volumes, vnr_got);
2642 out_idr_remove_minor:
2643         idr_remove(&minors, minor_got);
2644         synchronize_rcu();
2645 out_no_minor_idr:
2646         kfree(mdev->current_epoch);
2647 out_no_epoch:
2648         drbd_bm_cleanup(mdev);
2649 out_no_bitmap:
2650         __free_page(mdev->md_io_page);
2651 out_no_io_page:
2652         put_disk(disk);
2653 out_no_disk:
2654         blk_cleanup_queue(q);
2655 out_no_q:
2656         kfree(mdev);
2657         kref_put(&tconn->kref, &conn_destroy);
2658         return err;
2659 }
2660
2661 int __init drbd_init(void)
2662 {
2663         int err;
2664
2665         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2666                 printk(KERN_ERR
2667                        "drbd: invalid minor_count (%d)\n", minor_count);
2668 #ifdef MODULE
2669                 return -EINVAL;
2670 #else
2671                 minor_count = 8;
2672 #endif
2673         }
2674
2675         err = register_blkdev(DRBD_MAJOR, "drbd");
2676         if (err) {
2677                 printk(KERN_ERR
2678                        "drbd: unable to register block device major %d\n",
2679                        DRBD_MAJOR);
2680                 return err;
2681         }
2682
2683         err = drbd_genl_register();
2684         if (err) {
2685                 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2686                 goto fail;
2687         }
2688
2689
2690         register_reboot_notifier(&drbd_notifier);
2691
2692         /*
2693          * allocate all necessary structs
2694          */
2695         err = -ENOMEM;
2696
2697         init_waitqueue_head(&drbd_pp_wait);
2698
2699         drbd_proc = NULL; /* play safe for drbd_cleanup */
2700         idr_init(&minors);
2701
2702         err = drbd_create_mempools();
2703         if (err)
2704                 goto fail;
2705
2706         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2707         if (!drbd_proc) {
2708                 printk(KERN_ERR "drbd: unable to register proc file\n");
2709                 goto fail;
2710         }
2711
2712         rwlock_init(&global_state_lock);
2713         INIT_LIST_HEAD(&drbd_tconns);
2714
2715         printk(KERN_INFO "drbd: initialized. "
2716                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2717                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2718         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2719         printk(KERN_INFO "drbd: registered as block device major %d\n",
2720                 DRBD_MAJOR);
2721
2722         return 0; /* Success! */
2723
2724 fail:
2725         drbd_cleanup();
2726         if (err == -ENOMEM)
2727                 /* currently always the case */
2728                 printk(KERN_ERR "drbd: ran out of memory\n");
2729         else
2730                 printk(KERN_ERR "drbd: initialization failure\n");
2731         return err;
2732 }
2733
2734 void drbd_free_bc(struct drbd_backing_dev *ldev)
2735 {
2736         if (ldev == NULL)
2737                 return;
2738
2739         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2740         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2741
2742         kfree(ldev);
2743 }
2744
2745 void drbd_free_sock(struct drbd_tconn *tconn)
2746 {
2747         if (tconn->data.socket) {
2748                 mutex_lock(&tconn->data.mutex);
2749                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2750                 sock_release(tconn->data.socket);
2751                 tconn->data.socket = NULL;
2752                 mutex_unlock(&tconn->data.mutex);
2753         }
2754         if (tconn->meta.socket) {
2755                 mutex_lock(&tconn->meta.mutex);
2756                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2757                 sock_release(tconn->meta.socket);
2758                 tconn->meta.socket = NULL;
2759                 mutex_unlock(&tconn->meta.mutex);
2760         }
2761 }
2762
2763 /* meta data management */
2764
2765 struct meta_data_on_disk {
2766         u64 la_size;           /* last agreed size. */
2767         u64 uuid[UI_SIZE];   /* UUIDs. */
2768         u64 device_uuid;
2769         u64 reserved_u64_1;
2770         u32 flags;             /* MDF */
2771         u32 magic;
2772         u32 md_size_sect;
2773         u32 al_offset;         /* offset to this block */
2774         u32 al_nr_extents;     /* important for restoring the AL */
2775               /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2776         u32 bm_offset;         /* offset to the bitmap, from here */
2777         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2778         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2779         u32 reserved_u32[3];
2780
2781 } __packed;
2782
2783 /**
2784  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2785  * @mdev:       DRBD device.
2786  */
2787 void drbd_md_sync(struct drbd_conf *mdev)
2788 {
2789         struct meta_data_on_disk *buffer;
2790         sector_t sector;
2791         int i;
2792
2793         del_timer(&mdev->md_sync_timer);
2794         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2795         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2796                 return;
2797
2798         /* We use here D_FAILED and not D_ATTACHING because we try to write
2799          * metadata even if we detach due to a disk failure! */
2800         if (!get_ldev_if_state(mdev, D_FAILED))
2801                 return;
2802
2803         mutex_lock(&mdev->md_io_mutex);
2804         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2805         memset(buffer, 0, 512);
2806
2807         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2808         for (i = UI_CURRENT; i < UI_SIZE; i++)
2809                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2810         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2811         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2812
2813         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2814         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2815         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2816         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2817         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2818
2819         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2820         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2821
2822         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2823         sector = mdev->ldev->md.md_offset;
2824
2825         if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2826                 /* this was a try anyways ... */
2827                 dev_err(DEV, "meta data update failed!\n");
2828                 drbd_chk_io_error(mdev, 1, true);
2829         }
2830
2831         /* Update mdev->ldev->md.la_size_sect,
2832          * since we updated it on metadata. */
2833         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2834
2835         mutex_unlock(&mdev->md_io_mutex);
2836         put_ldev(mdev);
2837 }
2838
2839 /**
2840  * drbd_md_read() - Reads in the meta data super block
2841  * @mdev:       DRBD device.
2842  * @bdev:       Device from which the meta data should be read in.
2843  *
2844  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2845  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2846  */
2847 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2848 {
2849         struct meta_data_on_disk *buffer;
2850         int i, rv = NO_ERROR;
2851
2852         if (!get_ldev_if_state(mdev, D_ATTACHING))
2853                 return ERR_IO_MD_DISK;
2854
2855         mutex_lock(&mdev->md_io_mutex);
2856         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2857
2858         if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2859                 /* NOTE: can't do normal error processing here as this is
2860                    called BEFORE disk is attached */
2861                 dev_err(DEV, "Error while reading metadata.\n");
2862                 rv = ERR_IO_MD_DISK;
2863                 goto err;
2864         }
2865
2866         if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2867                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2868                 rv = ERR_MD_INVALID;
2869                 goto err;
2870         }
2871         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2872                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2873                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
2874                 rv = ERR_MD_INVALID;
2875                 goto err;
2876         }
2877         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
2878                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
2879                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
2880                 rv = ERR_MD_INVALID;
2881                 goto err;
2882         }
2883         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
2884                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
2885                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
2886                 rv = ERR_MD_INVALID;
2887                 goto err;
2888         }
2889
2890         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
2891                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
2892                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
2893                 rv = ERR_MD_INVALID;
2894                 goto err;
2895         }
2896
2897         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
2898         for (i = UI_CURRENT; i < UI_SIZE; i++)
2899                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
2900         bdev->md.flags = be32_to_cpu(buffer->flags);
2901         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
2902
2903         spin_lock_irq(&mdev->tconn->req_lock);
2904         if (mdev->state.conn < C_CONNECTED) {
2905                 int peer;
2906                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
2907                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
2908                 mdev->peer_max_bio_size = peer;
2909         }
2910         spin_unlock_irq(&mdev->tconn->req_lock);
2911
2912         /* This blocks wants to be get removed... */
2913         bdev->disk_conf->al_extents = be32_to_cpu(buffer->al_nr_extents);
2914         if (bdev->disk_conf->al_extents < DRBD_AL_EXTENTS_MIN)
2915                 bdev->disk_conf->al_extents = DRBD_AL_EXTENTS_DEF;
2916
2917  err:
2918         mutex_unlock(&mdev->md_io_mutex);
2919         put_ldev(mdev);
2920
2921         return rv;
2922 }
2923
2924 /**
2925  * drbd_md_mark_dirty() - Mark meta data super block as dirty
2926  * @mdev:       DRBD device.
2927  *
2928  * Call this function if you change anything that should be written to
2929  * the meta-data super block. This function sets MD_DIRTY, and starts a
2930  * timer that ensures that within five seconds you have to call drbd_md_sync().
2931  */
2932 #ifdef DEBUG
2933 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
2934 {
2935         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
2936                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
2937                 mdev->last_md_mark_dirty.line = line;
2938                 mdev->last_md_mark_dirty.func = func;
2939         }
2940 }
2941 #else
2942 void drbd_md_mark_dirty(struct drbd_conf *mdev)
2943 {
2944         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
2945                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
2946 }
2947 #endif
2948
2949 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
2950 {
2951         int i;
2952
2953         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
2954                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
2955 }
2956
2957 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2958 {
2959         if (idx == UI_CURRENT) {
2960                 if (mdev->state.role == R_PRIMARY)
2961                         val |= 1;
2962                 else
2963                         val &= ~((u64)1);
2964
2965                 drbd_set_ed_uuid(mdev, val);
2966         }
2967
2968         mdev->ldev->md.uuid[idx] = val;
2969         drbd_md_mark_dirty(mdev);
2970 }
2971
2972
2973 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2974 {
2975         if (mdev->ldev->md.uuid[idx]) {
2976                 drbd_uuid_move_history(mdev);
2977                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
2978         }
2979         _drbd_uuid_set(mdev, idx, val);
2980 }
2981
2982 /**
2983  * drbd_uuid_new_current() - Creates a new current UUID
2984  * @mdev:       DRBD device.
2985  *
2986  * Creates a new current UUID, and rotates the old current UUID into
2987  * the bitmap slot. Causes an incremental resync upon next connect.
2988  */
2989 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
2990 {
2991         u64 val;
2992         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2993
2994         if (bm_uuid)
2995                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2996
2997         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
2998
2999         get_random_bytes(&val, sizeof(u64));
3000         _drbd_uuid_set(mdev, UI_CURRENT, val);
3001         drbd_print_uuids(mdev, "new current UUID");
3002         /* get it to stable storage _now_ */
3003         drbd_md_sync(mdev);
3004 }
3005
3006 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3007 {
3008         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3009                 return;
3010
3011         if (val == 0) {
3012                 drbd_uuid_move_history(mdev);
3013                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3014                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3015         } else {
3016                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3017                 if (bm_uuid)
3018                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3019
3020                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3021         }
3022         drbd_md_mark_dirty(mdev);
3023 }
3024
3025 /**
3026  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3027  * @mdev:       DRBD device.
3028  *
3029  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3030  */
3031 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3032 {
3033         int rv = -EIO;
3034
3035         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3036                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3037                 drbd_md_sync(mdev);
3038                 drbd_bm_set_all(mdev);
3039
3040                 rv = drbd_bm_write(mdev);
3041
3042                 if (!rv) {
3043                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3044                         drbd_md_sync(mdev);
3045                 }
3046
3047                 put_ldev(mdev);
3048         }
3049
3050         return rv;
3051 }
3052
3053 /**
3054  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3055  * @mdev:       DRBD device.
3056  *
3057  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3058  */
3059 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3060 {
3061         int rv = -EIO;
3062
3063         drbd_resume_al(mdev);
3064         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3065                 drbd_bm_clear_all(mdev);
3066                 rv = drbd_bm_write(mdev);
3067                 put_ldev(mdev);
3068         }
3069
3070         return rv;
3071 }
3072
3073 static int w_bitmap_io(struct drbd_work *w, int unused)
3074 {
3075         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3076         struct drbd_conf *mdev = w->mdev;
3077         int rv = -EIO;
3078
3079         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3080
3081         if (get_ldev(mdev)) {
3082                 drbd_bm_lock(mdev, work->why, work->flags);
3083                 rv = work->io_fn(mdev);
3084                 drbd_bm_unlock(mdev);
3085                 put_ldev(mdev);
3086         }
3087
3088         clear_bit_unlock(BITMAP_IO, &mdev->flags);
3089         wake_up(&mdev->misc_wait);
3090
3091         if (work->done)
3092                 work->done(mdev, rv);
3093
3094         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3095         work->why = NULL;
3096         work->flags = 0;
3097
3098         return 0;
3099 }
3100
3101 void drbd_ldev_destroy(struct drbd_conf *mdev)
3102 {
3103         lc_destroy(mdev->resync);
3104         mdev->resync = NULL;
3105         lc_destroy(mdev->act_log);
3106         mdev->act_log = NULL;
3107         __no_warn(local,
3108                 drbd_free_bc(mdev->ldev);
3109                 mdev->ldev = NULL;);
3110
3111         clear_bit(GO_DISKLESS, &mdev->flags);
3112 }
3113
3114 static int w_go_diskless(struct drbd_work *w, int unused)
3115 {
3116         struct drbd_conf *mdev = w->mdev;
3117
3118         D_ASSERT(mdev->state.disk == D_FAILED);
3119         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3120          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3121          * the protected members anymore, though, so once put_ldev reaches zero
3122          * again, it will be safe to free them. */
3123         drbd_force_state(mdev, NS(disk, D_DISKLESS));
3124         return 0;
3125 }
3126
3127 void drbd_go_diskless(struct drbd_conf *mdev)
3128 {
3129         D_ASSERT(mdev->state.disk == D_FAILED);
3130         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3131                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
3132 }
3133
3134 /**
3135  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3136  * @mdev:       DRBD device.
3137  * @io_fn:      IO callback to be called when bitmap IO is possible
3138  * @done:       callback to be called after the bitmap IO was performed
3139  * @why:        Descriptive text of the reason for doing the IO
3140  *
3141  * While IO on the bitmap happens we freeze application IO thus we ensure
3142  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3143  * called from worker context. It MUST NOT be used while a previous such
3144  * work is still pending!
3145  */
3146 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3147                           int (*io_fn)(struct drbd_conf *),
3148                           void (*done)(struct drbd_conf *, int),
3149                           char *why, enum bm_flag flags)
3150 {
3151         D_ASSERT(current == mdev->tconn->worker.task);
3152
3153         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3154         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3155         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3156         if (mdev->bm_io_work.why)
3157                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3158                         why, mdev->bm_io_work.why);
3159
3160         mdev->bm_io_work.io_fn = io_fn;
3161         mdev->bm_io_work.done = done;
3162         mdev->bm_io_work.why = why;
3163         mdev->bm_io_work.flags = flags;
3164
3165         spin_lock_irq(&mdev->tconn->req_lock);
3166         set_bit(BITMAP_IO, &mdev->flags);
3167         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3168                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3169                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
3170         }
3171         spin_unlock_irq(&mdev->tconn->req_lock);
3172 }
3173
3174 /**
3175  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3176  * @mdev:       DRBD device.
3177  * @io_fn:      IO callback to be called when bitmap IO is possible
3178  * @why:        Descriptive text of the reason for doing the IO
3179  *
3180  * freezes application IO while that the actual IO operations runs. This
3181  * functions MAY NOT be called from worker context.
3182  */
3183 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3184                 char *why, enum bm_flag flags)
3185 {
3186         int rv;
3187
3188         D_ASSERT(current != mdev->tconn->worker.task);
3189
3190         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3191                 drbd_suspend_io(mdev);
3192
3193         drbd_bm_lock(mdev, why, flags);
3194         rv = io_fn(mdev);
3195         drbd_bm_unlock(mdev);
3196
3197         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3198                 drbd_resume_io(mdev);
3199
3200         return rv;
3201 }
3202
3203 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3204 {
3205         if ((mdev->ldev->md.flags & flag) != flag) {
3206                 drbd_md_mark_dirty(mdev);
3207                 mdev->ldev->md.flags |= flag;
3208         }
3209 }
3210
3211 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3212 {
3213         if ((mdev->ldev->md.flags & flag) != 0) {
3214                 drbd_md_mark_dirty(mdev);
3215                 mdev->ldev->md.flags &= ~flag;
3216         }
3217 }
3218 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3219 {
3220         return (bdev->md.flags & flag) != 0;
3221 }
3222
3223 static void md_sync_timer_fn(unsigned long data)
3224 {
3225         struct drbd_conf *mdev = (struct drbd_conf *) data;
3226
3227         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3228 }
3229
3230 static int w_md_sync(struct drbd_work *w, int unused)
3231 {
3232         struct drbd_conf *mdev = w->mdev;
3233
3234         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3235 #ifdef DEBUG
3236         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3237                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3238 #endif
3239         drbd_md_sync(mdev);
3240         return 0;
3241 }
3242
3243 const char *cmdname(enum drbd_packet cmd)
3244 {
3245         /* THINK may need to become several global tables
3246          * when we want to support more than
3247          * one PRO_VERSION */
3248         static const char *cmdnames[] = {
3249                 [P_DATA]                = "Data",
3250                 [P_DATA_REPLY]          = "DataReply",
3251                 [P_RS_DATA_REPLY]       = "RSDataReply",
3252                 [P_BARRIER]             = "Barrier",
3253                 [P_BITMAP]              = "ReportBitMap",
3254                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3255                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3256                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
3257                 [P_DATA_REQUEST]        = "DataRequest",
3258                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
3259                 [P_SYNC_PARAM]          = "SyncParam",
3260                 [P_SYNC_PARAM89]        = "SyncParam89",
3261                 [P_PROTOCOL]            = "ReportProtocol",
3262                 [P_UUIDS]               = "ReportUUIDs",
3263                 [P_SIZES]               = "ReportSizes",
3264                 [P_STATE]               = "ReportState",
3265                 [P_SYNC_UUID]           = "ReportSyncUUID",
3266                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3267                 [P_AUTH_RESPONSE]       = "AuthResponse",
3268                 [P_PING]                = "Ping",
3269                 [P_PING_ACK]            = "PingAck",
3270                 [P_RECV_ACK]            = "RecvAck",
3271                 [P_WRITE_ACK]           = "WriteAck",
3272                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3273                 [P_DISCARD_WRITE]        = "DiscardWrite",
3274                 [P_NEG_ACK]             = "NegAck",
3275                 [P_NEG_DREPLY]          = "NegDReply",
3276                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3277                 [P_BARRIER_ACK]         = "BarrierAck",
3278                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3279                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3280                 [P_OV_REQUEST]          = "OVRequest",
3281                 [P_OV_REPLY]            = "OVReply",
3282                 [P_OV_RESULT]           = "OVResult",
3283                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3284                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3285                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3286                 [P_DELAY_PROBE]         = "DelayProbe",
3287                 [P_OUT_OF_SYNC]         = "OutOfSync",
3288                 [P_RETRY_WRITE]         = "RetryWrite",
3289                 [P_RS_CANCEL]           = "RSCancel",
3290                 [P_CONN_ST_CHG_REQ]     = "conn_st_chg_req",
3291                 [P_CONN_ST_CHG_REPLY]   = "conn_st_chg_reply",
3292
3293                 /* enum drbd_packet, but not commands - obsoleted flags:
3294                  *      P_MAY_IGNORE
3295                  *      P_MAX_OPT_CMD
3296                  */
3297         };
3298
3299         /* too big for the array: 0xfffX */
3300         if (cmd == P_INITIAL_META)
3301                 return "InitialMeta";
3302         if (cmd == P_INITIAL_DATA)
3303                 return "InitialData";
3304         if (cmd == P_CONNECTION_FEATURES)
3305                 return "ConnectionFeatures";
3306         if (cmd >= ARRAY_SIZE(cmdnames))
3307                 return "Unknown";
3308         return cmdnames[cmd];
3309 }
3310
3311 /**
3312  * drbd_wait_misc  -  wait for a request to make progress
3313  * @mdev:       device associated with the request
3314  * @i:          the struct drbd_interval embedded in struct drbd_request or
3315  *              struct drbd_peer_request
3316  */
3317 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3318 {
3319         struct net_conf *nc;
3320         DEFINE_WAIT(wait);
3321         long timeout;
3322
3323         rcu_read_lock();
3324         nc = rcu_dereference(mdev->tconn->net_conf);
3325         if (!nc) {
3326                 rcu_read_unlock();
3327                 return -ETIMEDOUT;
3328         }
3329         timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
3330         rcu_read_unlock();
3331
3332         /* Indicate to wake up mdev->misc_wait on progress.  */
3333         i->waiting = true;
3334         prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3335         spin_unlock_irq(&mdev->tconn->req_lock);
3336         timeout = schedule_timeout(timeout);
3337         finish_wait(&mdev->misc_wait, &wait);
3338         spin_lock_irq(&mdev->tconn->req_lock);
3339         if (!timeout || mdev->state.conn < C_CONNECTED)
3340                 return -ETIMEDOUT;
3341         if (signal_pending(current))
3342                 return -ERESTARTSYS;
3343         return 0;
3344 }
3345
3346 #ifdef CONFIG_DRBD_FAULT_INJECTION
3347 /* Fault insertion support including random number generator shamelessly
3348  * stolen from kernel/rcutorture.c */
3349 struct fault_random_state {
3350         unsigned long state;
3351         unsigned long count;
3352 };
3353
3354 #define FAULT_RANDOM_MULT 39916801  /* prime */
3355 #define FAULT_RANDOM_ADD        479001701 /* prime */
3356 #define FAULT_RANDOM_REFRESH 10000
3357
3358 /*
3359  * Crude but fast random-number generator.  Uses a linear congruential
3360  * generator, with occasional help from get_random_bytes().
3361  */
3362 static unsigned long
3363 _drbd_fault_random(struct fault_random_state *rsp)
3364 {
3365         long refresh;
3366
3367         if (!rsp->count--) {
3368                 get_random_bytes(&refresh, sizeof(refresh));
3369                 rsp->state += refresh;
3370                 rsp->count = FAULT_RANDOM_REFRESH;
3371         }
3372         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3373         return swahw32(rsp->state);
3374 }
3375
3376 static char *
3377 _drbd_fault_str(unsigned int type) {
3378         static char *_faults[] = {
3379                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3380                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3381                 [DRBD_FAULT_RS_WR] = "Resync write",
3382                 [DRBD_FAULT_RS_RD] = "Resync read",
3383                 [DRBD_FAULT_DT_WR] = "Data write",
3384                 [DRBD_FAULT_DT_RD] = "Data read",
3385                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3386                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3387                 [DRBD_FAULT_AL_EE] = "EE allocation",
3388                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3389         };
3390
3391         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3392 }
3393
3394 unsigned int
3395 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3396 {
3397         static struct fault_random_state rrs = {0, 0};
3398
3399         unsigned int ret = (
3400                 (fault_devs == 0 ||
3401                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3402                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3403
3404         if (ret) {
3405                 fault_count++;
3406
3407                 if (__ratelimit(&drbd_ratelimit_state))
3408                         dev_warn(DEV, "***Simulating %s failure\n",
3409                                 _drbd_fault_str(type));
3410         }
3411
3412         return ret;
3413 }
3414 #endif
3415
3416 const char *drbd_buildtag(void)
3417 {
3418         /* DRBD built from external sources has here a reference to the
3419            git hash of the source code. */
3420
3421         static char buildtag[38] = "\0uilt-in";
3422
3423         if (buildtag[0] == 0) {
3424 #ifdef CONFIG_MODULES
3425                 if (THIS_MODULE != NULL)
3426                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3427                 else
3428 #endif
3429                         buildtag[0] = 'b';
3430         }
3431
3432         return buildtag;
3433 }
3434
3435 module_init(drbd_init)
3436 module_exit(drbd_cleanup)
3437
3438 EXPORT_SYMBOL(drbd_conn_str);
3439 EXPORT_SYMBOL(drbd_role_str);
3440 EXPORT_SYMBOL(drbd_disk_str);
3441 EXPORT_SYMBOL(drbd_set_st_err_str);