]> git.karo-electronics.de Git - mv-sheeva.git/blob - drivers/block/drbd/drbd_receiver.c
drbd: remove unused #include <linux/version.h>
[mv-sheeva.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/smp_lock.h>
40 #include <linux/pkt_sched.h>
41 #define __KERNEL_SYSCALLS__
42 #include <linux/unistd.h>
43 #include <linux/vmalloc.h>
44 #include <linux/random.h>
45 #include <linux/mm.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_req.h"
50
51 #include "drbd_vli.h"
52
53 struct flush_work {
54         struct drbd_work w;
55         struct drbd_epoch *epoch;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_handshake(struct drbd_conf *mdev);
65 static int drbd_do_auth(struct drbd_conf *mdev);
66
67 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
69
70 static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
71 {
72         struct drbd_epoch *prev;
73         spin_lock(&mdev->epoch_lock);
74         prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
75         if (prev == epoch || prev == mdev->current_epoch)
76                 prev = NULL;
77         spin_unlock(&mdev->epoch_lock);
78         return prev;
79 }
80
81 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
82
83 static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
84 {
85         struct page *page = NULL;
86
87         /* Yes, testing drbd_pp_vacant outside the lock is racy.
88          * So what. It saves a spin_lock. */
89         if (drbd_pp_vacant > 0) {
90                 spin_lock(&drbd_pp_lock);
91                 page = drbd_pp_pool;
92                 if (page) {
93                         drbd_pp_pool = (struct page *)page_private(page);
94                         set_page_private(page, 0); /* just to be polite */
95                         drbd_pp_vacant--;
96                 }
97                 spin_unlock(&drbd_pp_lock);
98         }
99         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
100          * "criss-cross" setup, that might cause write-out on some other DRBD,
101          * which in turn might block on the other node at this very place.  */
102         if (!page)
103                 page = alloc_page(GFP_TRY);
104         if (page)
105                 atomic_inc(&mdev->pp_in_use);
106         return page;
107 }
108
109 /* kick lower level device, if we have more than (arbitrary number)
110  * reference counts on it, which typically are locally submitted io
111  * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
112 static void maybe_kick_lo(struct drbd_conf *mdev)
113 {
114         if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
115                 drbd_kick_lo(mdev);
116 }
117
118 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
119 {
120         struct drbd_epoch_entry *e;
121         struct list_head *le, *tle;
122
123         /* The EEs are always appended to the end of the list. Since
124            they are sent in order over the wire, they have to finish
125            in order. As soon as we see the first not finished we can
126            stop to examine the list... */
127
128         list_for_each_safe(le, tle, &mdev->net_ee) {
129                 e = list_entry(le, struct drbd_epoch_entry, w.list);
130                 if (drbd_bio_has_active_page(e->private_bio))
131                         break;
132                 list_move(le, to_be_freed);
133         }
134 }
135
136 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
137 {
138         LIST_HEAD(reclaimed);
139         struct drbd_epoch_entry *e, *t;
140
141         maybe_kick_lo(mdev);
142         spin_lock_irq(&mdev->req_lock);
143         reclaim_net_ee(mdev, &reclaimed);
144         spin_unlock_irq(&mdev->req_lock);
145
146         list_for_each_entry_safe(e, t, &reclaimed, w.list)
147                 drbd_free_ee(mdev, e);
148 }
149
150 /**
151  * drbd_pp_alloc() - Returns a page, fails only if a signal comes in
152  * @mdev:       DRBD device.
153  * @retry:      whether or not to retry allocation forever (or until signalled)
154  *
155  * Tries to allocate a page, first from our own page pool, then from the
156  * kernel, unless this allocation would exceed the max_buffers setting.
157  * If @retry is non-zero, retry until DRBD frees a page somewhere else.
158  */
159 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
160 {
161         struct page *page = NULL;
162         DEFINE_WAIT(wait);
163
164         if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
165                 page = drbd_pp_first_page_or_try_alloc(mdev);
166                 if (page)
167                         return page;
168         }
169
170         for (;;) {
171                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
172
173                 drbd_kick_lo_and_reclaim_net(mdev);
174
175                 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
176                         page = drbd_pp_first_page_or_try_alloc(mdev);
177                         if (page)
178                                 break;
179                 }
180
181                 if (!retry)
182                         break;
183
184                 if (signal_pending(current)) {
185                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
186                         break;
187                 }
188
189                 schedule();
190         }
191         finish_wait(&drbd_pp_wait, &wait);
192
193         return page;
194 }
195
196 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
197  * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
198 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
199 {
200         int free_it;
201
202         spin_lock(&drbd_pp_lock);
203         if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
204                 free_it = 1;
205         } else {
206                 set_page_private(page, (unsigned long)drbd_pp_pool);
207                 drbd_pp_pool = page;
208                 drbd_pp_vacant++;
209                 free_it = 0;
210         }
211         spin_unlock(&drbd_pp_lock);
212
213         atomic_dec(&mdev->pp_in_use);
214
215         if (free_it)
216                 __free_page(page);
217
218         wake_up(&drbd_pp_wait);
219 }
220
221 static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
222 {
223         struct page *p_to_be_freed = NULL;
224         struct page *page;
225         struct bio_vec *bvec;
226         int i;
227
228         spin_lock(&drbd_pp_lock);
229         __bio_for_each_segment(bvec, bio, i, 0) {
230                 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
231                         set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
232                         p_to_be_freed = bvec->bv_page;
233                 } else {
234                         set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
235                         drbd_pp_pool = bvec->bv_page;
236                         drbd_pp_vacant++;
237                 }
238         }
239         spin_unlock(&drbd_pp_lock);
240         atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
241
242         while (p_to_be_freed) {
243                 page = p_to_be_freed;
244                 p_to_be_freed = (struct page *)page_private(page);
245                 set_page_private(page, 0); /* just to be polite */
246                 put_page(page);
247         }
248
249         wake_up(&drbd_pp_wait);
250 }
251
252 /*
253 You need to hold the req_lock:
254  _drbd_wait_ee_list_empty()
255
256 You must not have the req_lock:
257  drbd_free_ee()
258  drbd_alloc_ee()
259  drbd_init_ee()
260  drbd_release_ee()
261  drbd_ee_fix_bhs()
262  drbd_process_done_ee()
263  drbd_clear_done_ee()
264  drbd_wait_ee_list_empty()
265 */
266
267 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
268                                      u64 id,
269                                      sector_t sector,
270                                      unsigned int data_size,
271                                      gfp_t gfp_mask) __must_hold(local)
272 {
273         struct request_queue *q;
274         struct drbd_epoch_entry *e;
275         struct page *page;
276         struct bio *bio;
277         unsigned int ds;
278
279         if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
280                 return NULL;
281
282         e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
283         if (!e) {
284                 if (!(gfp_mask & __GFP_NOWARN))
285                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
286                 return NULL;
287         }
288
289         bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
290         if (!bio) {
291                 if (!(gfp_mask & __GFP_NOWARN))
292                         dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
293                 goto fail1;
294         }
295
296         bio->bi_bdev = mdev->ldev->backing_bdev;
297         bio->bi_sector = sector;
298
299         ds = data_size;
300         while (ds) {
301                 page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
302                 if (!page) {
303                         if (!(gfp_mask & __GFP_NOWARN))
304                                 dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
305                         goto fail2;
306                 }
307                 if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
308                         drbd_pp_free(mdev, page);
309                         dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
310                             "data_size=%u,ds=%u) failed\n",
311                             (unsigned long long)sector, data_size, ds);
312
313                         q = bdev_get_queue(bio->bi_bdev);
314                         if (q->merge_bvec_fn) {
315                                 struct bvec_merge_data bvm = {
316                                         .bi_bdev = bio->bi_bdev,
317                                         .bi_sector = bio->bi_sector,
318                                         .bi_size = bio->bi_size,
319                                         .bi_rw = bio->bi_rw,
320                                 };
321                                 int l = q->merge_bvec_fn(q, &bvm,
322                                                 &bio->bi_io_vec[bio->bi_vcnt]);
323                                 dev_err(DEV, "merge_bvec_fn() = %d\n", l);
324                         }
325
326                         /* dump more of the bio. */
327                         dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
328                         dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
329                         dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
330                         dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
331
332                         goto fail2;
333                         break;
334                 }
335                 ds -= min_t(int, ds, PAGE_SIZE);
336         }
337
338         D_ASSERT(data_size == bio->bi_size);
339
340         bio->bi_private = e;
341         e->mdev = mdev;
342         e->sector = sector;
343         e->size = bio->bi_size;
344
345         e->private_bio = bio;
346         e->block_id = id;
347         INIT_HLIST_NODE(&e->colision);
348         e->epoch = NULL;
349         e->flags = 0;
350
351         return e;
352
353  fail2:
354         drbd_pp_free_bio_pages(mdev, bio);
355         bio_put(bio);
356  fail1:
357         mempool_free(e, drbd_ee_mempool);
358
359         return NULL;
360 }
361
362 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
363 {
364         struct bio *bio = e->private_bio;
365         drbd_pp_free_bio_pages(mdev, bio);
366         bio_put(bio);
367         D_ASSERT(hlist_unhashed(&e->colision));
368         mempool_free(e, drbd_ee_mempool);
369 }
370
371 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
372 {
373         LIST_HEAD(work_list);
374         struct drbd_epoch_entry *e, *t;
375         int count = 0;
376
377         spin_lock_irq(&mdev->req_lock);
378         list_splice_init(list, &work_list);
379         spin_unlock_irq(&mdev->req_lock);
380
381         list_for_each_entry_safe(e, t, &work_list, w.list) {
382                 drbd_free_ee(mdev, e);
383                 count++;
384         }
385         return count;
386 }
387
388
389 /*
390  * This function is called from _asender only_
391  * but see also comments in _req_mod(,barrier_acked)
392  * and receive_Barrier.
393  *
394  * Move entries from net_ee to done_ee, if ready.
395  * Grab done_ee, call all callbacks, free the entries.
396  * The callbacks typically send out ACKs.
397  */
398 static int drbd_process_done_ee(struct drbd_conf *mdev)
399 {
400         LIST_HEAD(work_list);
401         LIST_HEAD(reclaimed);
402         struct drbd_epoch_entry *e, *t;
403         int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
404
405         spin_lock_irq(&mdev->req_lock);
406         reclaim_net_ee(mdev, &reclaimed);
407         list_splice_init(&mdev->done_ee, &work_list);
408         spin_unlock_irq(&mdev->req_lock);
409
410         list_for_each_entry_safe(e, t, &reclaimed, w.list)
411                 drbd_free_ee(mdev, e);
412
413         /* possible callbacks here:
414          * e_end_block, and e_end_resync_block, e_send_discard_ack.
415          * all ignore the last argument.
416          */
417         list_for_each_entry_safe(e, t, &work_list, w.list) {
418                 /* list_del not necessary, next/prev members not touched */
419                 ok = e->w.cb(mdev, &e->w, !ok) && ok;
420                 drbd_free_ee(mdev, e);
421         }
422         wake_up(&mdev->ee_wait);
423
424         return ok;
425 }
426
427 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
428 {
429         DEFINE_WAIT(wait);
430
431         /* avoids spin_lock/unlock
432          * and calling prepare_to_wait in the fast path */
433         while (!list_empty(head)) {
434                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
435                 spin_unlock_irq(&mdev->req_lock);
436                 drbd_kick_lo(mdev);
437                 schedule();
438                 finish_wait(&mdev->ee_wait, &wait);
439                 spin_lock_irq(&mdev->req_lock);
440         }
441 }
442
443 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
444 {
445         spin_lock_irq(&mdev->req_lock);
446         _drbd_wait_ee_list_empty(mdev, head);
447         spin_unlock_irq(&mdev->req_lock);
448 }
449
450 /* see also kernel_accept; which is only present since 2.6.18.
451  * also we want to log which part of it failed, exactly */
452 static int drbd_accept(struct drbd_conf *mdev, const char **what,
453                 struct socket *sock, struct socket **newsock)
454 {
455         struct sock *sk = sock->sk;
456         int err = 0;
457
458         *what = "listen";
459         err = sock->ops->listen(sock, 5);
460         if (err < 0)
461                 goto out;
462
463         *what = "sock_create_lite";
464         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
465                                newsock);
466         if (err < 0)
467                 goto out;
468
469         *what = "accept";
470         err = sock->ops->accept(sock, *newsock, 0);
471         if (err < 0) {
472                 sock_release(*newsock);
473                 *newsock = NULL;
474                 goto out;
475         }
476         (*newsock)->ops  = sock->ops;
477
478 out:
479         return err;
480 }
481
482 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
483                     void *buf, size_t size, int flags)
484 {
485         mm_segment_t oldfs;
486         struct kvec iov = {
487                 .iov_base = buf,
488                 .iov_len = size,
489         };
490         struct msghdr msg = {
491                 .msg_iovlen = 1,
492                 .msg_iov = (struct iovec *)&iov,
493                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
494         };
495         int rv;
496
497         oldfs = get_fs();
498         set_fs(KERNEL_DS);
499         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
500         set_fs(oldfs);
501
502         return rv;
503 }
504
505 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
506 {
507         mm_segment_t oldfs;
508         struct kvec iov = {
509                 .iov_base = buf,
510                 .iov_len = size,
511         };
512         struct msghdr msg = {
513                 .msg_iovlen = 1,
514                 .msg_iov = (struct iovec *)&iov,
515                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
516         };
517         int rv;
518
519         oldfs = get_fs();
520         set_fs(KERNEL_DS);
521
522         for (;;) {
523                 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
524                 if (rv == size)
525                         break;
526
527                 /* Note:
528                  * ECONNRESET   other side closed the connection
529                  * ERESTARTSYS  (on  sock) we got a signal
530                  */
531
532                 if (rv < 0) {
533                         if (rv == -ECONNRESET)
534                                 dev_info(DEV, "sock was reset by peer\n");
535                         else if (rv != -ERESTARTSYS)
536                                 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
537                         break;
538                 } else if (rv == 0) {
539                         dev_info(DEV, "sock was shut down by peer\n");
540                         break;
541                 } else  {
542                         /* signal came in, or peer/link went down,
543                          * after we read a partial message
544                          */
545                         /* D_ASSERT(signal_pending(current)); */
546                         break;
547                 }
548         };
549
550         set_fs(oldfs);
551
552         if (rv != size)
553                 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
554
555         return rv;
556 }
557
558 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
559 {
560         const char *what;
561         struct socket *sock;
562         struct sockaddr_in6 src_in6;
563         int err;
564         int disconnect_on_error = 1;
565
566         if (!get_net_conf(mdev))
567                 return NULL;
568
569         what = "sock_create_kern";
570         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
571                 SOCK_STREAM, IPPROTO_TCP, &sock);
572         if (err < 0) {
573                 sock = NULL;
574                 goto out;
575         }
576
577         sock->sk->sk_rcvtimeo =
578         sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
579
580        /* explicitly bind to the configured IP as source IP
581         *  for the outgoing connections.
582         *  This is needed for multihomed hosts and to be
583         *  able to use lo: interfaces for drbd.
584         * Make sure to use 0 as port number, so linux selects
585         *  a free one dynamically.
586         */
587         memcpy(&src_in6, mdev->net_conf->my_addr,
588                min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
589         if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
590                 src_in6.sin6_port = 0;
591         else
592                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
593
594         what = "bind before connect";
595         err = sock->ops->bind(sock,
596                               (struct sockaddr *) &src_in6,
597                               mdev->net_conf->my_addr_len);
598         if (err < 0)
599                 goto out;
600
601         /* connect may fail, peer not yet available.
602          * stay C_WF_CONNECTION, don't go Disconnecting! */
603         disconnect_on_error = 0;
604         what = "connect";
605         err = sock->ops->connect(sock,
606                                  (struct sockaddr *)mdev->net_conf->peer_addr,
607                                  mdev->net_conf->peer_addr_len, 0);
608
609 out:
610         if (err < 0) {
611                 if (sock) {
612                         sock_release(sock);
613                         sock = NULL;
614                 }
615                 switch (-err) {
616                         /* timeout, busy, signal pending */
617                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
618                 case EINTR: case ERESTARTSYS:
619                         /* peer not (yet) available, network problem */
620                 case ECONNREFUSED: case ENETUNREACH:
621                 case EHOSTDOWN:    case EHOSTUNREACH:
622                         disconnect_on_error = 0;
623                         break;
624                 default:
625                         dev_err(DEV, "%s failed, err = %d\n", what, err);
626                 }
627                 if (disconnect_on_error)
628                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
629         }
630         put_net_conf(mdev);
631         return sock;
632 }
633
634 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
635 {
636         int timeo, err;
637         struct socket *s_estab = NULL, *s_listen;
638         const char *what;
639
640         if (!get_net_conf(mdev))
641                 return NULL;
642
643         what = "sock_create_kern";
644         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
645                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
646         if (err) {
647                 s_listen = NULL;
648                 goto out;
649         }
650
651         timeo = mdev->net_conf->try_connect_int * HZ;
652         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
653
654         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
655         s_listen->sk->sk_rcvtimeo = timeo;
656         s_listen->sk->sk_sndtimeo = timeo;
657
658         what = "bind before listen";
659         err = s_listen->ops->bind(s_listen,
660                               (struct sockaddr *) mdev->net_conf->my_addr,
661                               mdev->net_conf->my_addr_len);
662         if (err < 0)
663                 goto out;
664
665         err = drbd_accept(mdev, &what, s_listen, &s_estab);
666
667 out:
668         if (s_listen)
669                 sock_release(s_listen);
670         if (err < 0) {
671                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
672                         dev_err(DEV, "%s failed, err = %d\n", what, err);
673                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
674                 }
675         }
676         put_net_conf(mdev);
677
678         return s_estab;
679 }
680
681 static int drbd_send_fp(struct drbd_conf *mdev,
682         struct socket *sock, enum drbd_packets cmd)
683 {
684         struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
685
686         return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
687 }
688
689 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
690 {
691         struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
692         int rr;
693
694         rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
695
696         if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
697                 return be16_to_cpu(h->command);
698
699         return 0xffff;
700 }
701
702 /**
703  * drbd_socket_okay() - Free the socket if its connection is not okay
704  * @mdev:       DRBD device.
705  * @sock:       pointer to the pointer to the socket.
706  */
707 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
708 {
709         int rr;
710         char tb[4];
711
712         if (!*sock)
713                 return FALSE;
714
715         rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
716
717         if (rr > 0 || rr == -EAGAIN) {
718                 return TRUE;
719         } else {
720                 sock_release(*sock);
721                 *sock = NULL;
722                 return FALSE;
723         }
724 }
725
726 /*
727  * return values:
728  *   1 yes, we have a valid connection
729  *   0 oops, did not work out, please try again
730  *  -1 peer talks different language,
731  *     no point in trying again, please go standalone.
732  *  -2 We do not have a network config...
733  */
734 static int drbd_connect(struct drbd_conf *mdev)
735 {
736         struct socket *s, *sock, *msock;
737         int try, h, ok;
738
739         D_ASSERT(!mdev->data.socket);
740
741         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
742                 dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
743
744         if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
745                 return -2;
746
747         clear_bit(DISCARD_CONCURRENT, &mdev->flags);
748
749         sock  = NULL;
750         msock = NULL;
751
752         do {
753                 for (try = 0;;) {
754                         /* 3 tries, this should take less than a second! */
755                         s = drbd_try_connect(mdev);
756                         if (s || ++try >= 3)
757                                 break;
758                         /* give the other side time to call bind() & listen() */
759                         __set_current_state(TASK_INTERRUPTIBLE);
760                         schedule_timeout(HZ / 10);
761                 }
762
763                 if (s) {
764                         if (!sock) {
765                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
766                                 sock = s;
767                                 s = NULL;
768                         } else if (!msock) {
769                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
770                                 msock = s;
771                                 s = NULL;
772                         } else {
773                                 dev_err(DEV, "Logic error in drbd_connect()\n");
774                                 goto out_release_sockets;
775                         }
776                 }
777
778                 if (sock && msock) {
779                         __set_current_state(TASK_INTERRUPTIBLE);
780                         schedule_timeout(HZ / 10);
781                         ok = drbd_socket_okay(mdev, &sock);
782                         ok = drbd_socket_okay(mdev, &msock) && ok;
783                         if (ok)
784                                 break;
785                 }
786
787 retry:
788                 s = drbd_wait_for_connect(mdev);
789                 if (s) {
790                         try = drbd_recv_fp(mdev, s);
791                         drbd_socket_okay(mdev, &sock);
792                         drbd_socket_okay(mdev, &msock);
793                         switch (try) {
794                         case P_HAND_SHAKE_S:
795                                 if (sock) {
796                                         dev_warn(DEV, "initial packet S crossed\n");
797                                         sock_release(sock);
798                                 }
799                                 sock = s;
800                                 break;
801                         case P_HAND_SHAKE_M:
802                                 if (msock) {
803                                         dev_warn(DEV, "initial packet M crossed\n");
804                                         sock_release(msock);
805                                 }
806                                 msock = s;
807                                 set_bit(DISCARD_CONCURRENT, &mdev->flags);
808                                 break;
809                         default:
810                                 dev_warn(DEV, "Error receiving initial packet\n");
811                                 sock_release(s);
812                                 if (random32() & 1)
813                                         goto retry;
814                         }
815                 }
816
817                 if (mdev->state.conn <= C_DISCONNECTING)
818                         goto out_release_sockets;
819                 if (signal_pending(current)) {
820                         flush_signals(current);
821                         smp_rmb();
822                         if (get_t_state(&mdev->receiver) == Exiting)
823                                 goto out_release_sockets;
824                 }
825
826                 if (sock && msock) {
827                         ok = drbd_socket_okay(mdev, &sock);
828                         ok = drbd_socket_okay(mdev, &msock) && ok;
829                         if (ok)
830                                 break;
831                 }
832         } while (1);
833
834         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
835         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
836
837         sock->sk->sk_allocation = GFP_NOIO;
838         msock->sk->sk_allocation = GFP_NOIO;
839
840         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
841         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
842
843         if (mdev->net_conf->sndbuf_size) {
844                 sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
845                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
846         }
847
848         if (mdev->net_conf->rcvbuf_size) {
849                 sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
850                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
851         }
852
853         /* NOT YET ...
854          * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
855          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
856          * first set it to the P_HAND_SHAKE timeout,
857          * which we set to 4x the configured ping_timeout. */
858         sock->sk->sk_sndtimeo =
859         sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
860
861         msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862         msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
863
864         /* we don't want delays.
865          * we use TCP_CORK where apropriate, though */
866         drbd_tcp_nodelay(sock);
867         drbd_tcp_nodelay(msock);
868
869         mdev->data.socket = sock;
870         mdev->meta.socket = msock;
871         mdev->last_received = jiffies;
872
873         D_ASSERT(mdev->asender.task == NULL);
874
875         h = drbd_do_handshake(mdev);
876         if (h <= 0)
877                 return h;
878
879         if (mdev->cram_hmac_tfm) {
880                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
881                 if (!drbd_do_auth(mdev)) {
882                         dev_err(DEV, "Authentication of peer failed\n");
883                         return -1;
884                 }
885         }
886
887         if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
888                 return 0;
889
890         sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
891         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
892
893         atomic_set(&mdev->packet_seq, 0);
894         mdev->peer_seq = 0;
895
896         drbd_thread_start(&mdev->asender);
897
898         drbd_send_protocol(mdev);
899         drbd_send_sync_param(mdev, &mdev->sync_conf);
900         drbd_send_sizes(mdev, 0);
901         drbd_send_uuids(mdev);
902         drbd_send_state(mdev);
903         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
904         clear_bit(RESIZE_PENDING, &mdev->flags);
905
906         return 1;
907
908 out_release_sockets:
909         if (sock)
910                 sock_release(sock);
911         if (msock)
912                 sock_release(msock);
913         return -1;
914 }
915
916 static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
917 {
918         int r;
919
920         r = drbd_recv(mdev, h, sizeof(*h));
921
922         if (unlikely(r != sizeof(*h))) {
923                 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
924                 return FALSE;
925         };
926         h->command = be16_to_cpu(h->command);
927         h->length  = be16_to_cpu(h->length);
928         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
929                 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
930                     (long)be32_to_cpu(h->magic),
931                     h->command, h->length);
932                 return FALSE;
933         }
934         mdev->last_received = jiffies;
935
936         return TRUE;
937 }
938
939 static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
940 {
941         int rv;
942
943         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
944                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, NULL);
945                 if (rv) {
946                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
947                         /* would rather check on EOPNOTSUPP, but that is not reliable.
948                          * don't try again for ANY return value != 0
949                          * if (rv == -EOPNOTSUPP) */
950                         drbd_bump_write_ordering(mdev, WO_drain_io);
951                 }
952                 put_ldev(mdev);
953         }
954
955         return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
956 }
957
958 static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
959 {
960         struct flush_work *fw = (struct flush_work *)w;
961         struct drbd_epoch *epoch = fw->epoch;
962
963         kfree(w);
964
965         if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
966                 drbd_flush_after_epoch(mdev, epoch);
967
968         drbd_may_finish_epoch(mdev, epoch, EV_PUT |
969                               (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
970
971         return 1;
972 }
973
974 /**
975  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
976  * @mdev:       DRBD device.
977  * @epoch:      Epoch object.
978  * @ev:         Epoch event.
979  */
980 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
981                                                struct drbd_epoch *epoch,
982                                                enum epoch_event ev)
983 {
984         int finish, epoch_size;
985         struct drbd_epoch *next_epoch;
986         int schedule_flush = 0;
987         enum finish_epoch rv = FE_STILL_LIVE;
988
989         spin_lock(&mdev->epoch_lock);
990         do {
991                 next_epoch = NULL;
992                 finish = 0;
993
994                 epoch_size = atomic_read(&epoch->epoch_size);
995
996                 switch (ev & ~EV_CLEANUP) {
997                 case EV_PUT:
998                         atomic_dec(&epoch->active);
999                         break;
1000                 case EV_GOT_BARRIER_NR:
1001                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1002
1003                         /* Special case: If we just switched from WO_bio_barrier to
1004                            WO_bdev_flush we should not finish the current epoch */
1005                         if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1006                             mdev->write_ordering != WO_bio_barrier &&
1007                             epoch == mdev->current_epoch)
1008                                 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1009                         break;
1010                 case EV_BARRIER_DONE:
1011                         set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1012                         break;
1013                 case EV_BECAME_LAST:
1014                         /* nothing to do*/
1015                         break;
1016                 }
1017
1018                 if (epoch_size != 0 &&
1019                     atomic_read(&epoch->active) == 0 &&
1020                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1021                     epoch->list.prev == &mdev->current_epoch->list &&
1022                     !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1023                         /* Nearly all conditions are met to finish that epoch... */
1024                         if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1025                             mdev->write_ordering == WO_none ||
1026                             (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1027                             ev & EV_CLEANUP) {
1028                                 finish = 1;
1029                                 set_bit(DE_IS_FINISHING, &epoch->flags);
1030                         } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1031                                  mdev->write_ordering == WO_bio_barrier) {
1032                                 atomic_inc(&epoch->active);
1033                                 schedule_flush = 1;
1034                         }
1035                 }
1036                 if (finish) {
1037                         if (!(ev & EV_CLEANUP)) {
1038                                 spin_unlock(&mdev->epoch_lock);
1039                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1040                                 spin_lock(&mdev->epoch_lock);
1041                         }
1042                         dec_unacked(mdev);
1043
1044                         if (mdev->current_epoch != epoch) {
1045                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1046                                 list_del(&epoch->list);
1047                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1048                                 mdev->epochs--;
1049                                 kfree(epoch);
1050
1051                                 if (rv == FE_STILL_LIVE)
1052                                         rv = FE_DESTROYED;
1053                         } else {
1054                                 epoch->flags = 0;
1055                                 atomic_set(&epoch->epoch_size, 0);
1056                                 /* atomic_set(&epoch->active, 0); is alrady zero */
1057                                 if (rv == FE_STILL_LIVE)
1058                                         rv = FE_RECYCLED;
1059                         }
1060                 }
1061
1062                 if (!next_epoch)
1063                         break;
1064
1065                 epoch = next_epoch;
1066         } while (1);
1067
1068         spin_unlock(&mdev->epoch_lock);
1069
1070         if (schedule_flush) {
1071                 struct flush_work *fw;
1072                 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1073                 if (fw) {
1074                         fw->w.cb = w_flush;
1075                         fw->epoch = epoch;
1076                         drbd_queue_work(&mdev->data.work, &fw->w);
1077                 } else {
1078                         dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1079                         set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1080                         /* That is not a recursion, only one level */
1081                         drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1082                         drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1083                 }
1084         }
1085
1086         return rv;
1087 }
1088
1089 /**
1090  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1091  * @mdev:       DRBD device.
1092  * @wo:         Write ordering method to try.
1093  */
1094 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1095 {
1096         enum write_ordering_e pwo;
1097         static char *write_ordering_str[] = {
1098                 [WO_none] = "none",
1099                 [WO_drain_io] = "drain",
1100                 [WO_bdev_flush] = "flush",
1101                 [WO_bio_barrier] = "barrier",
1102         };
1103
1104         pwo = mdev->write_ordering;
1105         wo = min(pwo, wo);
1106         if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1107                 wo = WO_bdev_flush;
1108         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1109                 wo = WO_drain_io;
1110         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1111                 wo = WO_none;
1112         mdev->write_ordering = wo;
1113         if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1114                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1115 }
1116
1117 /**
1118  * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1119  * @mdev:       DRBD device.
1120  * @w:          work object.
1121  * @cancel:     The connection will be closed anyways (unused in this callback)
1122  */
1123 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1124 {
1125         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1126         struct bio *bio = e->private_bio;
1127
1128         /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1129            (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1130            so that we can finish that epoch in drbd_may_finish_epoch().
1131            That is necessary if we already have a long chain of Epochs, before
1132            we realize that BIO_RW_BARRIER is actually not supported */
1133
1134         /* As long as the -ENOTSUPP on the barrier is reported immediately
1135            that will never trigger. If it is reported late, we will just
1136            print that warning and continue correctly for all future requests
1137            with WO_bdev_flush */
1138         if (previous_epoch(mdev, e->epoch))
1139                 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1140
1141         /* prepare bio for re-submit,
1142          * re-init volatile members */
1143         /* we still have a local reference,
1144          * get_ldev was done in receive_Data. */
1145         bio->bi_bdev = mdev->ldev->backing_bdev;
1146         bio->bi_sector = e->sector;
1147         bio->bi_size = e->size;
1148         bio->bi_idx = 0;
1149
1150         bio->bi_flags &= ~(BIO_POOL_MASK - 1);
1151         bio->bi_flags |= 1 << BIO_UPTODATE;
1152
1153         /* don't know whether this is necessary: */
1154         bio->bi_phys_segments = 0;
1155         bio->bi_next = NULL;
1156
1157         /* these should be unchanged: */
1158         /* bio->bi_end_io = drbd_endio_write_sec; */
1159         /* bio->bi_vcnt = whatever; */
1160
1161         e->w.cb = e_end_block;
1162
1163         /* This is no longer a barrier request. */
1164         bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
1165
1166         drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
1167
1168         return 1;
1169 }
1170
1171 static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1172 {
1173         int rv, issue_flush;
1174         struct p_barrier *p = (struct p_barrier *)h;
1175         struct drbd_epoch *epoch;
1176
1177         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1178
1179         rv = drbd_recv(mdev, h->payload, h->length);
1180         ERR_IF(rv != h->length) return FALSE;
1181
1182         inc_unacked(mdev);
1183
1184         if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1185                 drbd_kick_lo(mdev);
1186
1187         mdev->current_epoch->barrier_nr = p->barrier;
1188         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1189
1190         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1191          * the activity log, which means it would not be resynced in case the
1192          * R_PRIMARY crashes now.
1193          * Therefore we must send the barrier_ack after the barrier request was
1194          * completed. */
1195         switch (mdev->write_ordering) {
1196         case WO_bio_barrier:
1197         case WO_none:
1198                 if (rv == FE_RECYCLED)
1199                         return TRUE;
1200                 break;
1201
1202         case WO_bdev_flush:
1203         case WO_drain_io:
1204                 D_ASSERT(rv == FE_STILL_LIVE);
1205                 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1206                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1207                 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1208                 if (rv == FE_RECYCLED)
1209                         return TRUE;
1210
1211                 /* The asender will send all the ACKs and barrier ACKs out, since
1212                    all EEs moved from the active_ee to the done_ee. We need to
1213                    provide a new epoch object for the EEs that come in soon */
1214                 break;
1215         }
1216
1217         /* receiver context, in the writeout path of the other node.
1218          * avoid potential distributed deadlock */
1219         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1220         if (!epoch) {
1221                 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1222                 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1223                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1224                 if (issue_flush) {
1225                         rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1226                         if (rv == FE_RECYCLED)
1227                                 return TRUE;
1228                 }
1229
1230                 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1231
1232                 return TRUE;
1233         }
1234
1235         epoch->flags = 0;
1236         atomic_set(&epoch->epoch_size, 0);
1237         atomic_set(&epoch->active, 0);
1238
1239         spin_lock(&mdev->epoch_lock);
1240         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1241                 list_add(&epoch->list, &mdev->current_epoch->list);
1242                 mdev->current_epoch = epoch;
1243                 mdev->epochs++;
1244         } else {
1245                 /* The current_epoch got recycled while we allocated this one... */
1246                 kfree(epoch);
1247         }
1248         spin_unlock(&mdev->epoch_lock);
1249
1250         return TRUE;
1251 }
1252
1253 /* used from receive_RSDataReply (recv_resync_read)
1254  * and from receive_Data */
1255 static struct drbd_epoch_entry *
1256 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1257 {
1258         struct drbd_epoch_entry *e;
1259         struct bio_vec *bvec;
1260         struct page *page;
1261         struct bio *bio;
1262         int dgs, ds, i, rr;
1263         void *dig_in = mdev->int_dig_in;
1264         void *dig_vv = mdev->int_dig_vv;
1265
1266         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1267                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1268
1269         if (dgs) {
1270                 rr = drbd_recv(mdev, dig_in, dgs);
1271                 if (rr != dgs) {
1272                         dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1273                              rr, dgs);
1274                         return NULL;
1275                 }
1276         }
1277
1278         data_size -= dgs;
1279
1280         ERR_IF(data_size &  0x1ff) return NULL;
1281         ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1282
1283         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1284          * "criss-cross" setup, that might cause write-out on some other DRBD,
1285          * which in turn might block on the other node at this very place.  */
1286         e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1287         if (!e)
1288                 return NULL;
1289         bio = e->private_bio;
1290         ds = data_size;
1291         bio_for_each_segment(bvec, bio, i) {
1292                 page = bvec->bv_page;
1293                 rr = drbd_recv(mdev, kmap(page), min_t(int, ds, PAGE_SIZE));
1294                 kunmap(page);
1295                 if (rr != min_t(int, ds, PAGE_SIZE)) {
1296                         drbd_free_ee(mdev, e);
1297                         dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1298                              rr, min_t(int, ds, PAGE_SIZE));
1299                         return NULL;
1300                 }
1301                 ds -= rr;
1302         }
1303
1304         if (dgs) {
1305                 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1306                 if (memcmp(dig_in, dig_vv, dgs)) {
1307                         dev_err(DEV, "Digest integrity check FAILED.\n");
1308                         drbd_bcast_ee(mdev, "digest failed",
1309                                         dgs, dig_in, dig_vv, e);
1310                         drbd_free_ee(mdev, e);
1311                         return NULL;
1312                 }
1313         }
1314         mdev->recv_cnt += data_size>>9;
1315         return e;
1316 }
1317
1318 /* drbd_drain_block() just takes a data block
1319  * out of the socket input buffer, and discards it.
1320  */
1321 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1322 {
1323         struct page *page;
1324         int rr, rv = 1;
1325         void *data;
1326
1327         page = drbd_pp_alloc(mdev, 1);
1328
1329         data = kmap(page);
1330         while (data_size) {
1331                 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1332                 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1333                         rv = 0;
1334                         dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1335                              rr, min_t(int, data_size, PAGE_SIZE));
1336                         break;
1337                 }
1338                 data_size -= rr;
1339         }
1340         kunmap(page);
1341         drbd_pp_free(mdev, page);
1342         return rv;
1343 }
1344
1345 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1346                            sector_t sector, int data_size)
1347 {
1348         struct bio_vec *bvec;
1349         struct bio *bio;
1350         int dgs, rr, i, expect;
1351         void *dig_in = mdev->int_dig_in;
1352         void *dig_vv = mdev->int_dig_vv;
1353
1354         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1355                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1356
1357         if (dgs) {
1358                 rr = drbd_recv(mdev, dig_in, dgs);
1359                 if (rr != dgs) {
1360                         dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1361                              rr, dgs);
1362                         return 0;
1363                 }
1364         }
1365
1366         data_size -= dgs;
1367
1368         /* optimistically update recv_cnt.  if receiving fails below,
1369          * we disconnect anyways, and counters will be reset. */
1370         mdev->recv_cnt += data_size>>9;
1371
1372         bio = req->master_bio;
1373         D_ASSERT(sector == bio->bi_sector);
1374
1375         bio_for_each_segment(bvec, bio, i) {
1376                 expect = min_t(int, data_size, bvec->bv_len);
1377                 rr = drbd_recv(mdev,
1378                              kmap(bvec->bv_page)+bvec->bv_offset,
1379                              expect);
1380                 kunmap(bvec->bv_page);
1381                 if (rr != expect) {
1382                         dev_warn(DEV, "short read receiving data reply: "
1383                              "read %d expected %d\n",
1384                              rr, expect);
1385                         return 0;
1386                 }
1387                 data_size -= rr;
1388         }
1389
1390         if (dgs) {
1391                 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1392                 if (memcmp(dig_in, dig_vv, dgs)) {
1393                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1394                         return 0;
1395                 }
1396         }
1397
1398         D_ASSERT(data_size == 0);
1399         return 1;
1400 }
1401
1402 /* e_end_resync_block() is called via
1403  * drbd_process_done_ee() by asender only */
1404 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1405 {
1406         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1407         sector_t sector = e->sector;
1408         int ok;
1409
1410         D_ASSERT(hlist_unhashed(&e->colision));
1411
1412         if (likely(drbd_bio_uptodate(e->private_bio))) {
1413                 drbd_set_in_sync(mdev, sector, e->size);
1414                 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1415         } else {
1416                 /* Record failure to sync */
1417                 drbd_rs_failed_io(mdev, sector, e->size);
1418
1419                 ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1420         }
1421         dec_unacked(mdev);
1422
1423         return ok;
1424 }
1425
1426 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1427 {
1428         struct drbd_epoch_entry *e;
1429
1430         e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1431         if (!e) {
1432                 put_ldev(mdev);
1433                 return FALSE;
1434         }
1435
1436         dec_rs_pending(mdev);
1437
1438         e->private_bio->bi_end_io = drbd_endio_write_sec;
1439         e->private_bio->bi_rw = WRITE;
1440         e->w.cb = e_end_resync_block;
1441
1442         inc_unacked(mdev);
1443         /* corresponding dec_unacked() in e_end_resync_block()
1444          * respective _drbd_clear_done_ee */
1445
1446         spin_lock_irq(&mdev->req_lock);
1447         list_add(&e->w.list, &mdev->sync_ee);
1448         spin_unlock_irq(&mdev->req_lock);
1449
1450         drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
1451         /* accounting done in endio */
1452
1453         maybe_kick_lo(mdev);
1454         return TRUE;
1455 }
1456
1457 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1458 {
1459         struct drbd_request *req;
1460         sector_t sector;
1461         unsigned int header_size, data_size;
1462         int ok;
1463         struct p_data *p = (struct p_data *)h;
1464
1465         header_size = sizeof(*p) - sizeof(*h);
1466         data_size   = h->length  - header_size;
1467
1468         ERR_IF(data_size == 0) return FALSE;
1469
1470         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1471                 return FALSE;
1472
1473         sector = be64_to_cpu(p->sector);
1474
1475         spin_lock_irq(&mdev->req_lock);
1476         req = _ar_id_to_req(mdev, p->block_id, sector);
1477         spin_unlock_irq(&mdev->req_lock);
1478         if (unlikely(!req)) {
1479                 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1480                 return FALSE;
1481         }
1482
1483         /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1484          * special casing it there for the various failure cases.
1485          * still no race with drbd_fail_pending_reads */
1486         ok = recv_dless_read(mdev, req, sector, data_size);
1487
1488         if (ok)
1489                 req_mod(req, data_received);
1490         /* else: nothing. handled from drbd_disconnect...
1491          * I don't think we may complete this just yet
1492          * in case we are "on-disconnect: freeze" */
1493
1494         return ok;
1495 }
1496
1497 static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1498 {
1499         sector_t sector;
1500         unsigned int header_size, data_size;
1501         int ok;
1502         struct p_data *p = (struct p_data *)h;
1503
1504         header_size = sizeof(*p) - sizeof(*h);
1505         data_size   = h->length  - header_size;
1506
1507         ERR_IF(data_size == 0) return FALSE;
1508
1509         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1510                 return FALSE;
1511
1512         sector = be64_to_cpu(p->sector);
1513         D_ASSERT(p->block_id == ID_SYNCER);
1514
1515         if (get_ldev(mdev)) {
1516                 /* data is submitted to disk within recv_resync_read.
1517                  * corresponding put_ldev done below on error,
1518                  * or in drbd_endio_write_sec. */
1519                 ok = recv_resync_read(mdev, sector, data_size);
1520         } else {
1521                 if (__ratelimit(&drbd_ratelimit_state))
1522                         dev_err(DEV, "Can not write resync data to local disk.\n");
1523
1524                 ok = drbd_drain_block(mdev, data_size);
1525
1526                 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1527         }
1528
1529         return ok;
1530 }
1531
1532 /* e_end_block() is called via drbd_process_done_ee().
1533  * this means this function only runs in the asender thread
1534  */
1535 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1536 {
1537         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1538         sector_t sector = e->sector;
1539         struct drbd_epoch *epoch;
1540         int ok = 1, pcmd;
1541
1542         if (e->flags & EE_IS_BARRIER) {
1543                 epoch = previous_epoch(mdev, e->epoch);
1544                 if (epoch)
1545                         drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1546         }
1547
1548         if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1549                 if (likely(drbd_bio_uptodate(e->private_bio))) {
1550                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1551                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1552                                 e->flags & EE_MAY_SET_IN_SYNC) ?
1553                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1554                         ok &= drbd_send_ack(mdev, pcmd, e);
1555                         if (pcmd == P_RS_WRITE_ACK)
1556                                 drbd_set_in_sync(mdev, sector, e->size);
1557                 } else {
1558                         ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1559                         /* we expect it to be marked out of sync anyways...
1560                          * maybe assert this?  */
1561                 }
1562                 dec_unacked(mdev);
1563         }
1564         /* we delete from the conflict detection hash _after_ we sent out the
1565          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1566         if (mdev->net_conf->two_primaries) {
1567                 spin_lock_irq(&mdev->req_lock);
1568                 D_ASSERT(!hlist_unhashed(&e->colision));
1569                 hlist_del_init(&e->colision);
1570                 spin_unlock_irq(&mdev->req_lock);
1571         } else {
1572                 D_ASSERT(hlist_unhashed(&e->colision));
1573         }
1574
1575         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1576
1577         return ok;
1578 }
1579
1580 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1581 {
1582         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1583         int ok = 1;
1584
1585         D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1586         ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1587
1588         spin_lock_irq(&mdev->req_lock);
1589         D_ASSERT(!hlist_unhashed(&e->colision));
1590         hlist_del_init(&e->colision);
1591         spin_unlock_irq(&mdev->req_lock);
1592
1593         dec_unacked(mdev);
1594
1595         return ok;
1596 }
1597
1598 /* Called from receive_Data.
1599  * Synchronize packets on sock with packets on msock.
1600  *
1601  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1602  * packet traveling on msock, they are still processed in the order they have
1603  * been sent.
1604  *
1605  * Note: we don't care for Ack packets overtaking P_DATA packets.
1606  *
1607  * In case packet_seq is larger than mdev->peer_seq number, there are
1608  * outstanding packets on the msock. We wait for them to arrive.
1609  * In case we are the logically next packet, we update mdev->peer_seq
1610  * ourselves. Correctly handles 32bit wrap around.
1611  *
1612  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1613  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1614  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1615  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1616  *
1617  * returns 0 if we may process the packet,
1618  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1619 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1620 {
1621         DEFINE_WAIT(wait);
1622         unsigned int p_seq;
1623         long timeout;
1624         int ret = 0;
1625         spin_lock(&mdev->peer_seq_lock);
1626         for (;;) {
1627                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1628                 if (seq_le(packet_seq, mdev->peer_seq+1))
1629                         break;
1630                 if (signal_pending(current)) {
1631                         ret = -ERESTARTSYS;
1632                         break;
1633                 }
1634                 p_seq = mdev->peer_seq;
1635                 spin_unlock(&mdev->peer_seq_lock);
1636                 timeout = schedule_timeout(30*HZ);
1637                 spin_lock(&mdev->peer_seq_lock);
1638                 if (timeout == 0 && p_seq == mdev->peer_seq) {
1639                         ret = -ETIMEDOUT;
1640                         dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1641                         break;
1642                 }
1643         }
1644         finish_wait(&mdev->seq_wait, &wait);
1645         if (mdev->peer_seq+1 == packet_seq)
1646                 mdev->peer_seq++;
1647         spin_unlock(&mdev->peer_seq_lock);
1648         return ret;
1649 }
1650
1651 /* mirrored write */
1652 static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1653 {
1654         sector_t sector;
1655         struct drbd_epoch_entry *e;
1656         struct p_data *p = (struct p_data *)h;
1657         int header_size, data_size;
1658         int rw = WRITE;
1659         u32 dp_flags;
1660
1661         header_size = sizeof(*p) - sizeof(*h);
1662         data_size   = h->length  - header_size;
1663
1664         ERR_IF(data_size == 0) return FALSE;
1665
1666         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1667                 return FALSE;
1668
1669         if (!get_ldev(mdev)) {
1670                 if (__ratelimit(&drbd_ratelimit_state))
1671                         dev_err(DEV, "Can not write mirrored data block "
1672                             "to local disk.\n");
1673                 spin_lock(&mdev->peer_seq_lock);
1674                 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1675                         mdev->peer_seq++;
1676                 spin_unlock(&mdev->peer_seq_lock);
1677
1678                 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1679                 atomic_inc(&mdev->current_epoch->epoch_size);
1680                 return drbd_drain_block(mdev, data_size);
1681         }
1682
1683         /* get_ldev(mdev) successful.
1684          * Corresponding put_ldev done either below (on various errors),
1685          * or in drbd_endio_write_sec, if we successfully submit the data at
1686          * the end of this function. */
1687
1688         sector = be64_to_cpu(p->sector);
1689         e = read_in_block(mdev, p->block_id, sector, data_size);
1690         if (!e) {
1691                 put_ldev(mdev);
1692                 return FALSE;
1693         }
1694
1695         e->private_bio->bi_end_io = drbd_endio_write_sec;
1696         e->w.cb = e_end_block;
1697
1698         spin_lock(&mdev->epoch_lock);
1699         e->epoch = mdev->current_epoch;
1700         atomic_inc(&e->epoch->epoch_size);
1701         atomic_inc(&e->epoch->active);
1702
1703         if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1704                 struct drbd_epoch *epoch;
1705                 /* Issue a barrier if we start a new epoch, and the previous epoch
1706                    was not a epoch containing a single request which already was
1707                    a Barrier. */
1708                 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1709                 if (epoch == e->epoch) {
1710                         set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1711                         rw |= (1<<BIO_RW_BARRIER);
1712                         e->flags |= EE_IS_BARRIER;
1713                 } else {
1714                         if (atomic_read(&epoch->epoch_size) > 1 ||
1715                             !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1716                                 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1717                                 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1718                                 rw |= (1<<BIO_RW_BARRIER);
1719                                 e->flags |= EE_IS_BARRIER;
1720                         }
1721                 }
1722         }
1723         spin_unlock(&mdev->epoch_lock);
1724
1725         dp_flags = be32_to_cpu(p->dp_flags);
1726         if (dp_flags & DP_HARDBARRIER) {
1727                 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1728                 /* rw |= (1<<BIO_RW_BARRIER); */
1729         }
1730         if (dp_flags & DP_RW_SYNC)
1731                 rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1732         if (dp_flags & DP_MAY_SET_IN_SYNC)
1733                 e->flags |= EE_MAY_SET_IN_SYNC;
1734
1735         /* I'm the receiver, I do hold a net_cnt reference. */
1736         if (!mdev->net_conf->two_primaries) {
1737                 spin_lock_irq(&mdev->req_lock);
1738         } else {
1739                 /* don't get the req_lock yet,
1740                  * we may sleep in drbd_wait_peer_seq */
1741                 const int size = e->size;
1742                 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1743                 DEFINE_WAIT(wait);
1744                 struct drbd_request *i;
1745                 struct hlist_node *n;
1746                 struct hlist_head *slot;
1747                 int first;
1748
1749                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1750                 BUG_ON(mdev->ee_hash == NULL);
1751                 BUG_ON(mdev->tl_hash == NULL);
1752
1753                 /* conflict detection and handling:
1754                  * 1. wait on the sequence number,
1755                  *    in case this data packet overtook ACK packets.
1756                  * 2. check our hash tables for conflicting requests.
1757                  *    we only need to walk the tl_hash, since an ee can not
1758                  *    have a conflict with an other ee: on the submitting
1759                  *    node, the corresponding req had already been conflicting,
1760                  *    and a conflicting req is never sent.
1761                  *
1762                  * Note: for two_primaries, we are protocol C,
1763                  * so there cannot be any request that is DONE
1764                  * but still on the transfer log.
1765                  *
1766                  * unconditionally add to the ee_hash.
1767                  *
1768                  * if no conflicting request is found:
1769                  *    submit.
1770                  *
1771                  * if any conflicting request is found
1772                  * that has not yet been acked,
1773                  * AND I have the "discard concurrent writes" flag:
1774                  *       queue (via done_ee) the P_DISCARD_ACK; OUT.
1775                  *
1776                  * if any conflicting request is found:
1777                  *       block the receiver, waiting on misc_wait
1778                  *       until no more conflicting requests are there,
1779                  *       or we get interrupted (disconnect).
1780                  *
1781                  *       we do not just write after local io completion of those
1782                  *       requests, but only after req is done completely, i.e.
1783                  *       we wait for the P_DISCARD_ACK to arrive!
1784                  *
1785                  *       then proceed normally, i.e. submit.
1786                  */
1787                 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1788                         goto out_interrupted;
1789
1790                 spin_lock_irq(&mdev->req_lock);
1791
1792                 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1793
1794 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1795                 slot = tl_hash_slot(mdev, sector);
1796                 first = 1;
1797                 for (;;) {
1798                         int have_unacked = 0;
1799                         int have_conflict = 0;
1800                         prepare_to_wait(&mdev->misc_wait, &wait,
1801                                 TASK_INTERRUPTIBLE);
1802                         hlist_for_each_entry(i, n, slot, colision) {
1803                                 if (OVERLAPS) {
1804                                         /* only ALERT on first iteration,
1805                                          * we may be woken up early... */
1806                                         if (first)
1807                                                 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1808                                                       " new: %llus +%u; pending: %llus +%u\n",
1809                                                       current->comm, current->pid,
1810                                                       (unsigned long long)sector, size,
1811                                                       (unsigned long long)i->sector, i->size);
1812                                         if (i->rq_state & RQ_NET_PENDING)
1813                                                 ++have_unacked;
1814                                         ++have_conflict;
1815                                 }
1816                         }
1817 #undef OVERLAPS
1818                         if (!have_conflict)
1819                                 break;
1820
1821                         /* Discard Ack only for the _first_ iteration */
1822                         if (first && discard && have_unacked) {
1823                                 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1824                                      (unsigned long long)sector);
1825                                 inc_unacked(mdev);
1826                                 e->w.cb = e_send_discard_ack;
1827                                 list_add_tail(&e->w.list, &mdev->done_ee);
1828
1829                                 spin_unlock_irq(&mdev->req_lock);
1830
1831                                 /* we could probably send that P_DISCARD_ACK ourselves,
1832                                  * but I don't like the receiver using the msock */
1833
1834                                 put_ldev(mdev);
1835                                 wake_asender(mdev);
1836                                 finish_wait(&mdev->misc_wait, &wait);
1837                                 return TRUE;
1838                         }
1839
1840                         if (signal_pending(current)) {
1841                                 hlist_del_init(&e->colision);
1842
1843                                 spin_unlock_irq(&mdev->req_lock);
1844
1845                                 finish_wait(&mdev->misc_wait, &wait);
1846                                 goto out_interrupted;
1847                         }
1848
1849                         spin_unlock_irq(&mdev->req_lock);
1850                         if (first) {
1851                                 first = 0;
1852                                 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1853                                      "sec=%llus\n", (unsigned long long)sector);
1854                         } else if (discard) {
1855                                 /* we had none on the first iteration.
1856                                  * there must be none now. */
1857                                 D_ASSERT(have_unacked == 0);
1858                         }
1859                         schedule();
1860                         spin_lock_irq(&mdev->req_lock);
1861                 }
1862                 finish_wait(&mdev->misc_wait, &wait);
1863         }
1864
1865         list_add(&e->w.list, &mdev->active_ee);
1866         spin_unlock_irq(&mdev->req_lock);
1867
1868         switch (mdev->net_conf->wire_protocol) {
1869         case DRBD_PROT_C:
1870                 inc_unacked(mdev);
1871                 /* corresponding dec_unacked() in e_end_block()
1872                  * respective _drbd_clear_done_ee */
1873                 break;
1874         case DRBD_PROT_B:
1875                 /* I really don't like it that the receiver thread
1876                  * sends on the msock, but anyways */
1877                 drbd_send_ack(mdev, P_RECV_ACK, e);
1878                 break;
1879         case DRBD_PROT_A:
1880                 /* nothing to do */
1881                 break;
1882         }
1883
1884         if (mdev->state.pdsk == D_DISKLESS) {
1885                 /* In case we have the only disk of the cluster, */
1886                 drbd_set_out_of_sync(mdev, e->sector, e->size);
1887                 e->flags |= EE_CALL_AL_COMPLETE_IO;
1888                 drbd_al_begin_io(mdev, e->sector);
1889         }
1890
1891         e->private_bio->bi_rw = rw;
1892         drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
1893         /* accounting done in endio */
1894
1895         maybe_kick_lo(mdev);
1896         return TRUE;
1897
1898 out_interrupted:
1899         /* yes, the epoch_size now is imbalanced.
1900          * but we drop the connection anyways, so we don't have a chance to
1901          * receive a barrier... atomic_inc(&mdev->epoch_size); */
1902         put_ldev(mdev);
1903         drbd_free_ee(mdev, e);
1904         return FALSE;
1905 }
1906
1907 static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
1908 {
1909         sector_t sector;
1910         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1911         struct drbd_epoch_entry *e;
1912         struct digest_info *di = NULL;
1913         int size, digest_size;
1914         unsigned int fault_type;
1915         struct p_block_req *p =
1916                 (struct p_block_req *)h;
1917         const int brps = sizeof(*p)-sizeof(*h);
1918
1919         if (drbd_recv(mdev, h->payload, brps) != brps)
1920                 return FALSE;
1921
1922         sector = be64_to_cpu(p->sector);
1923         size   = be32_to_cpu(p->blksize);
1924
1925         if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
1926                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1927                                 (unsigned long long)sector, size);
1928                 return FALSE;
1929         }
1930         if (sector + (size>>9) > capacity) {
1931                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1932                                 (unsigned long long)sector, size);
1933                 return FALSE;
1934         }
1935
1936         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1937                 if (__ratelimit(&drbd_ratelimit_state))
1938                         dev_err(DEV, "Can not satisfy peer's read request, "
1939                             "no local data.\n");
1940                 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
1941                                  P_NEG_RS_DREPLY , p);
1942                 return TRUE;
1943         }
1944
1945         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1946          * "criss-cross" setup, that might cause write-out on some other DRBD,
1947          * which in turn might block on the other node at this very place.  */
1948         e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1949         if (!e) {
1950                 put_ldev(mdev);
1951                 return FALSE;
1952         }
1953
1954         e->private_bio->bi_rw = READ;
1955         e->private_bio->bi_end_io = drbd_endio_read_sec;
1956
1957         switch (h->command) {
1958         case P_DATA_REQUEST:
1959                 e->w.cb = w_e_end_data_req;
1960                 fault_type = DRBD_FAULT_DT_RD;
1961                 break;
1962         case P_RS_DATA_REQUEST:
1963                 e->w.cb = w_e_end_rsdata_req;
1964                 fault_type = DRBD_FAULT_RS_RD;
1965                 /* Eventually this should become asynchronously. Currently it
1966                  * blocks the whole receiver just to delay the reading of a
1967                  * resync data block.
1968                  * the drbd_work_queue mechanism is made for this...
1969                  */
1970                 if (!drbd_rs_begin_io(mdev, sector)) {
1971                         /* we have been interrupted,
1972                          * probably connection lost! */
1973                         D_ASSERT(signal_pending(current));
1974                         goto out_free_e;
1975                 }
1976                 break;
1977
1978         case P_OV_REPLY:
1979         case P_CSUM_RS_REQUEST:
1980                 fault_type = DRBD_FAULT_RS_RD;
1981                 digest_size = h->length - brps ;
1982                 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
1983                 if (!di)
1984                         goto out_free_e;
1985
1986                 di->digest_size = digest_size;
1987                 di->digest = (((char *)di)+sizeof(struct digest_info));
1988
1989                 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
1990                         goto out_free_e;
1991
1992                 e->block_id = (u64)(unsigned long)di;
1993                 if (h->command == P_CSUM_RS_REQUEST) {
1994                         D_ASSERT(mdev->agreed_pro_version >= 89);
1995                         e->w.cb = w_e_end_csum_rs_req;
1996                 } else if (h->command == P_OV_REPLY) {
1997                         e->w.cb = w_e_end_ov_reply;
1998                         dec_rs_pending(mdev);
1999                         break;
2000                 }
2001
2002                 if (!drbd_rs_begin_io(mdev, sector)) {
2003                         /* we have been interrupted, probably connection lost! */
2004                         D_ASSERT(signal_pending(current));
2005                         goto out_free_e;
2006                 }
2007                 break;
2008
2009         case P_OV_REQUEST:
2010                 if (mdev->state.conn >= C_CONNECTED &&
2011                     mdev->state.conn != C_VERIFY_T)
2012                         dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2013                                 drbd_conn_str(mdev->state.conn));
2014                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2015                     mdev->agreed_pro_version >= 90) {
2016                         mdev->ov_start_sector = sector;
2017                         mdev->ov_position = sector;
2018                         mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2019                         dev_info(DEV, "Online Verify start sector: %llu\n",
2020                                         (unsigned long long)sector);
2021                 }
2022                 e->w.cb = w_e_end_ov_req;
2023                 fault_type = DRBD_FAULT_RS_RD;
2024                 /* Eventually this should become asynchronous. Currently it
2025                  * blocks the whole receiver just to delay the reading of a
2026                  * resync data block.
2027                  * the drbd_work_queue mechanism is made for this...
2028                  */
2029                 if (!drbd_rs_begin_io(mdev, sector)) {
2030                         /* we have been interrupted,
2031                          * probably connection lost! */
2032                         D_ASSERT(signal_pending(current));
2033                         goto out_free_e;
2034                 }
2035                 break;
2036
2037
2038         default:
2039                 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2040                     cmdname(h->command));
2041                 fault_type = DRBD_FAULT_MAX;
2042         }
2043
2044         spin_lock_irq(&mdev->req_lock);
2045         list_add(&e->w.list, &mdev->read_ee);
2046         spin_unlock_irq(&mdev->req_lock);
2047
2048         inc_unacked(mdev);
2049
2050         drbd_generic_make_request(mdev, fault_type, e->private_bio);
2051         maybe_kick_lo(mdev);
2052
2053         return TRUE;
2054
2055 out_free_e:
2056         kfree(di);
2057         put_ldev(mdev);
2058         drbd_free_ee(mdev, e);
2059         return FALSE;
2060 }
2061
2062 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2063 {
2064         int self, peer, rv = -100;
2065         unsigned long ch_self, ch_peer;
2066
2067         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2068         peer = mdev->p_uuid[UI_BITMAP] & 1;
2069
2070         ch_peer = mdev->p_uuid[UI_SIZE];
2071         ch_self = mdev->comm_bm_set;
2072
2073         switch (mdev->net_conf->after_sb_0p) {
2074         case ASB_CONSENSUS:
2075         case ASB_DISCARD_SECONDARY:
2076         case ASB_CALL_HELPER:
2077                 dev_err(DEV, "Configuration error.\n");
2078                 break;
2079         case ASB_DISCONNECT:
2080                 break;
2081         case ASB_DISCARD_YOUNGER_PRI:
2082                 if (self == 0 && peer == 1) {
2083                         rv = -1;
2084                         break;
2085                 }
2086                 if (self == 1 && peer == 0) {
2087                         rv =  1;
2088                         break;
2089                 }
2090                 /* Else fall through to one of the other strategies... */
2091         case ASB_DISCARD_OLDER_PRI:
2092                 if (self == 0 && peer == 1) {
2093                         rv = 1;
2094                         break;
2095                 }
2096                 if (self == 1 && peer == 0) {
2097                         rv = -1;
2098                         break;
2099                 }
2100                 /* Else fall through to one of the other strategies... */
2101                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2102                      "Using discard-least-changes instead\n");
2103         case ASB_DISCARD_ZERO_CHG:
2104                 if (ch_peer == 0 && ch_self == 0) {
2105                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2106                                 ? -1 : 1;
2107                         break;
2108                 } else {
2109                         if (ch_peer == 0) { rv =  1; break; }
2110                         if (ch_self == 0) { rv = -1; break; }
2111                 }
2112                 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2113                         break;
2114         case ASB_DISCARD_LEAST_CHG:
2115                 if      (ch_self < ch_peer)
2116                         rv = -1;
2117                 else if (ch_self > ch_peer)
2118                         rv =  1;
2119                 else /* ( ch_self == ch_peer ) */
2120                      /* Well, then use something else. */
2121                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2122                                 ? -1 : 1;
2123                 break;
2124         case ASB_DISCARD_LOCAL:
2125                 rv = -1;
2126                 break;
2127         case ASB_DISCARD_REMOTE:
2128                 rv =  1;
2129         }
2130
2131         return rv;
2132 }
2133
2134 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2135 {
2136         int self, peer, hg, rv = -100;
2137
2138         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2139         peer = mdev->p_uuid[UI_BITMAP] & 1;
2140
2141         switch (mdev->net_conf->after_sb_1p) {
2142         case ASB_DISCARD_YOUNGER_PRI:
2143         case ASB_DISCARD_OLDER_PRI:
2144         case ASB_DISCARD_LEAST_CHG:
2145         case ASB_DISCARD_LOCAL:
2146         case ASB_DISCARD_REMOTE:
2147                 dev_err(DEV, "Configuration error.\n");
2148                 break;
2149         case ASB_DISCONNECT:
2150                 break;
2151         case ASB_CONSENSUS:
2152                 hg = drbd_asb_recover_0p(mdev);
2153                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2154                         rv = hg;
2155                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2156                         rv = hg;
2157                 break;
2158         case ASB_VIOLENTLY:
2159                 rv = drbd_asb_recover_0p(mdev);
2160                 break;
2161         case ASB_DISCARD_SECONDARY:
2162                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2163         case ASB_CALL_HELPER:
2164                 hg = drbd_asb_recover_0p(mdev);
2165                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2166                         self = drbd_set_role(mdev, R_SECONDARY, 0);
2167                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2168                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2169                           * we do not need to wait for the after state change work either. */
2170                         self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2171                         if (self != SS_SUCCESS) {
2172                                 drbd_khelper(mdev, "pri-lost-after-sb");
2173                         } else {
2174                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2175                                 rv = hg;
2176                         }
2177                 } else
2178                         rv = hg;
2179         }
2180
2181         return rv;
2182 }
2183
2184 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2185 {
2186         int self, peer, hg, rv = -100;
2187
2188         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2189         peer = mdev->p_uuid[UI_BITMAP] & 1;
2190
2191         switch (mdev->net_conf->after_sb_2p) {
2192         case ASB_DISCARD_YOUNGER_PRI:
2193         case ASB_DISCARD_OLDER_PRI:
2194         case ASB_DISCARD_LEAST_CHG:
2195         case ASB_DISCARD_LOCAL:
2196         case ASB_DISCARD_REMOTE:
2197         case ASB_CONSENSUS:
2198         case ASB_DISCARD_SECONDARY:
2199                 dev_err(DEV, "Configuration error.\n");
2200                 break;
2201         case ASB_VIOLENTLY:
2202                 rv = drbd_asb_recover_0p(mdev);
2203                 break;
2204         case ASB_DISCONNECT:
2205                 break;
2206         case ASB_CALL_HELPER:
2207                 hg = drbd_asb_recover_0p(mdev);
2208                 if (hg == -1) {
2209                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2210                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2211                           * we do not need to wait for the after state change work either. */
2212                         self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2213                         if (self != SS_SUCCESS) {
2214                                 drbd_khelper(mdev, "pri-lost-after-sb");
2215                         } else {
2216                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2217                                 rv = hg;
2218                         }
2219                 } else
2220                         rv = hg;
2221         }
2222
2223         return rv;
2224 }
2225
2226 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2227                            u64 bits, u64 flags)
2228 {
2229         if (!uuid) {
2230                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2231                 return;
2232         }
2233         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2234              text,
2235              (unsigned long long)uuid[UI_CURRENT],
2236              (unsigned long long)uuid[UI_BITMAP],
2237              (unsigned long long)uuid[UI_HISTORY_START],
2238              (unsigned long long)uuid[UI_HISTORY_END],
2239              (unsigned long long)bits,
2240              (unsigned long long)flags);
2241 }
2242
2243 /*
2244   100   after split brain try auto recover
2245     2   C_SYNC_SOURCE set BitMap
2246     1   C_SYNC_SOURCE use BitMap
2247     0   no Sync
2248    -1   C_SYNC_TARGET use BitMap
2249    -2   C_SYNC_TARGET set BitMap
2250  -100   after split brain, disconnect
2251 -1000   unrelated data
2252  */
2253 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2254 {
2255         u64 self, peer;
2256         int i, j;
2257
2258         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2259         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2260
2261         *rule_nr = 10;
2262         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2263                 return 0;
2264
2265         *rule_nr = 20;
2266         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2267              peer != UUID_JUST_CREATED)
2268                 return -2;
2269
2270         *rule_nr = 30;
2271         if (self != UUID_JUST_CREATED &&
2272             (peer == UUID_JUST_CREATED || peer == (u64)0))
2273                 return 2;
2274
2275         if (self == peer) {
2276                 int rct, dc; /* roles at crash time */
2277
2278                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2279
2280                         if (mdev->agreed_pro_version < 91)
2281                                 return -1001;
2282
2283                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2284                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2285                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2286                                 drbd_uuid_set_bm(mdev, 0UL);
2287
2288                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2289                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2290                                 *rule_nr = 34;
2291                         } else {
2292                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2293                                 *rule_nr = 36;
2294                         }
2295
2296                         return 1;
2297                 }
2298
2299                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2300
2301                         if (mdev->agreed_pro_version < 91)
2302                                 return -1001;
2303
2304                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2305                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2306                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2307
2308                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2309                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2310                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2311
2312                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2313                                 *rule_nr = 35;
2314                         } else {
2315                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2316                                 *rule_nr = 37;
2317                         }
2318
2319                         return -1;
2320                 }
2321
2322                 /* Common power [off|failure] */
2323                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2324                         (mdev->p_uuid[UI_FLAGS] & 2);
2325                 /* lowest bit is set when we were primary,
2326                  * next bit (weight 2) is set when peer was primary */
2327                 *rule_nr = 40;
2328
2329                 switch (rct) {
2330                 case 0: /* !self_pri && !peer_pri */ return 0;
2331                 case 1: /*  self_pri && !peer_pri */ return 1;
2332                 case 2: /* !self_pri &&  peer_pri */ return -1;
2333                 case 3: /*  self_pri &&  peer_pri */
2334                         dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2335                         return dc ? -1 : 1;
2336                 }
2337         }
2338
2339         *rule_nr = 50;
2340         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2341         if (self == peer)
2342                 return -1;
2343
2344         *rule_nr = 51;
2345         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2346         if (self == peer) {
2347                 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2348                 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2349                 if (self == peer) {
2350                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2351                            resync as sync source modifications of the peer's UUIDs. */
2352
2353                         if (mdev->agreed_pro_version < 91)
2354                                 return -1001;
2355
2356                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2357                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2358                         return -1;
2359                 }
2360         }
2361
2362         *rule_nr = 60;
2363         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2364         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2365                 peer = mdev->p_uuid[i] & ~((u64)1);
2366                 if (self == peer)
2367                         return -2;
2368         }
2369
2370         *rule_nr = 70;
2371         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2372         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2373         if (self == peer)
2374                 return 1;
2375
2376         *rule_nr = 71;
2377         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2378         if (self == peer) {
2379                 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2380                 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2381                 if (self == peer) {
2382                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2383                            resync as sync source modifications of our UUIDs. */
2384
2385                         if (mdev->agreed_pro_version < 91)
2386                                 return -1001;
2387
2388                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2389                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2390
2391                         dev_info(DEV, "Undid last start of resync:\n");
2392
2393                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2394                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2395
2396                         return 1;
2397                 }
2398         }
2399
2400
2401         *rule_nr = 80;
2402         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2403         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2404                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2405                 if (self == peer)
2406                         return 2;
2407         }
2408
2409         *rule_nr = 90;
2410         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2411         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2412         if (self == peer && self != ((u64)0))
2413                 return 100;
2414
2415         *rule_nr = 100;
2416         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2417                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2418                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2419                         peer = mdev->p_uuid[j] & ~((u64)1);
2420                         if (self == peer)
2421                                 return -100;
2422                 }
2423         }
2424
2425         return -1000;
2426 }
2427
2428 /* drbd_sync_handshake() returns the new conn state on success, or
2429    CONN_MASK (-1) on failure.
2430  */
2431 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2432                                            enum drbd_disk_state peer_disk) __must_hold(local)
2433 {
2434         int hg, rule_nr;
2435         enum drbd_conns rv = C_MASK;
2436         enum drbd_disk_state mydisk;
2437
2438         mydisk = mdev->state.disk;
2439         if (mydisk == D_NEGOTIATING)
2440                 mydisk = mdev->new_state_tmp.disk;
2441
2442         dev_info(DEV, "drbd_sync_handshake:\n");
2443         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2444         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2445                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2446
2447         hg = drbd_uuid_compare(mdev, &rule_nr);
2448
2449         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2450
2451         if (hg == -1000) {
2452                 dev_alert(DEV, "Unrelated data, aborting!\n");
2453                 return C_MASK;
2454         }
2455         if (hg == -1001) {
2456                 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2457                 return C_MASK;
2458         }
2459
2460         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2461             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2462                 int f = (hg == -100) || abs(hg) == 2;
2463                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2464                 if (f)
2465                         hg = hg*2;
2466                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2467                      hg > 0 ? "source" : "target");
2468         }
2469
2470         if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2471                 int pcount = (mdev->state.role == R_PRIMARY)
2472                            + (peer_role == R_PRIMARY);
2473                 int forced = (hg == -100);
2474
2475                 switch (pcount) {
2476                 case 0:
2477                         hg = drbd_asb_recover_0p(mdev);
2478                         break;
2479                 case 1:
2480                         hg = drbd_asb_recover_1p(mdev);
2481                         break;
2482                 case 2:
2483                         hg = drbd_asb_recover_2p(mdev);
2484                         break;
2485                 }
2486                 if (abs(hg) < 100) {
2487                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2488                              "automatically solved. Sync from %s node\n",
2489                              pcount, (hg < 0) ? "peer" : "this");
2490                         if (forced) {
2491                                 dev_warn(DEV, "Doing a full sync, since"
2492                                      " UUIDs where ambiguous.\n");
2493                                 hg = hg*2;
2494                         }
2495                 }
2496         }
2497
2498         if (hg == -100) {
2499                 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2500                         hg = -1;
2501                 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2502                         hg = 1;
2503
2504                 if (abs(hg) < 100)
2505                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2506                              "Sync from %s node\n",
2507                              (hg < 0) ? "peer" : "this");
2508         }
2509
2510         if (hg == -100) {
2511                 dev_alert(DEV, "Split-Brain detected, dropping connection!\n");
2512                 drbd_khelper(mdev, "split-brain");
2513                 return C_MASK;
2514         }
2515
2516         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2517                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2518                 return C_MASK;
2519         }
2520
2521         if (hg < 0 && /* by intention we do not use mydisk here. */
2522             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2523                 switch (mdev->net_conf->rr_conflict) {
2524                 case ASB_CALL_HELPER:
2525                         drbd_khelper(mdev, "pri-lost");
2526                         /* fall through */
2527                 case ASB_DISCONNECT:
2528                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2529                         return C_MASK;
2530                 case ASB_VIOLENTLY:
2531                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2532                              "assumption\n");
2533                 }
2534         }
2535
2536         if (abs(hg) >= 2) {
2537                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2538                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2539                         return C_MASK;
2540         }
2541
2542         if (hg > 0) { /* become sync source. */
2543                 rv = C_WF_BITMAP_S;
2544         } else if (hg < 0) { /* become sync target */
2545                 rv = C_WF_BITMAP_T;
2546         } else {
2547                 rv = C_CONNECTED;
2548                 if (drbd_bm_total_weight(mdev)) {
2549                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2550                              drbd_bm_total_weight(mdev));
2551                 }
2552         }
2553
2554         return rv;
2555 }
2556
2557 /* returns 1 if invalid */
2558 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2559 {
2560         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2561         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2562             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2563                 return 0;
2564
2565         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2566         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2567             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2568                 return 1;
2569
2570         /* everything else is valid if they are equal on both sides. */
2571         if (peer == self)
2572                 return 0;
2573
2574         /* everything es is invalid. */
2575         return 1;
2576 }
2577
2578 static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2579 {
2580         struct p_protocol *p = (struct p_protocol *)h;
2581         int header_size, data_size;
2582         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2583         int p_want_lose, p_two_primaries;
2584         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2585
2586         header_size = sizeof(*p) - sizeof(*h);
2587         data_size   = h->length  - header_size;
2588
2589         if (drbd_recv(mdev, h->payload, header_size) != header_size)
2590                 return FALSE;
2591
2592         p_proto         = be32_to_cpu(p->protocol);
2593         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2594         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2595         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2596         p_want_lose     = be32_to_cpu(p->want_lose);
2597         p_two_primaries = be32_to_cpu(p->two_primaries);
2598
2599         if (p_proto != mdev->net_conf->wire_protocol) {
2600                 dev_err(DEV, "incompatible communication protocols\n");
2601                 goto disconnect;
2602         }
2603
2604         if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2605                 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2606                 goto disconnect;
2607         }
2608
2609         if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2610                 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2611                 goto disconnect;
2612         }
2613
2614         if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2615                 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2616                 goto disconnect;
2617         }
2618
2619         if (p_want_lose && mdev->net_conf->want_lose) {
2620                 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2621                 goto disconnect;
2622         }
2623
2624         if (p_two_primaries != mdev->net_conf->two_primaries) {
2625                 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2626                 goto disconnect;
2627         }
2628
2629         if (mdev->agreed_pro_version >= 87) {
2630                 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2631
2632                 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2633                         return FALSE;
2634
2635                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2636                 if (strcmp(p_integrity_alg, my_alg)) {
2637                         dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2638                         goto disconnect;
2639                 }
2640                 dev_info(DEV, "data-integrity-alg: %s\n",
2641                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2642         }
2643
2644         return TRUE;
2645
2646 disconnect:
2647         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2648         return FALSE;
2649 }
2650
2651 /* helper function
2652  * input: alg name, feature name
2653  * return: NULL (alg name was "")
2654  *         ERR_PTR(error) if something goes wrong
2655  *         or the crypto hash ptr, if it worked out ok. */
2656 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2657                 const char *alg, const char *name)
2658 {
2659         struct crypto_hash *tfm;
2660
2661         if (!alg[0])
2662                 return NULL;
2663
2664         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2665         if (IS_ERR(tfm)) {
2666                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2667                         alg, name, PTR_ERR(tfm));
2668                 return tfm;
2669         }
2670         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2671                 crypto_free_hash(tfm);
2672                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2673                 return ERR_PTR(-EINVAL);
2674         }
2675         return tfm;
2676 }
2677
2678 static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2679 {
2680         int ok = TRUE;
2681         struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2682         unsigned int header_size, data_size, exp_max_sz;
2683         struct crypto_hash *verify_tfm = NULL;
2684         struct crypto_hash *csums_tfm = NULL;
2685         const int apv = mdev->agreed_pro_version;
2686
2687         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2688                     : apv == 88 ? sizeof(struct p_rs_param)
2689                                         + SHARED_SECRET_MAX
2690                     : /* 89 */    sizeof(struct p_rs_param_89);
2691
2692         if (h->length > exp_max_sz) {
2693                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2694                     h->length, exp_max_sz);
2695                 return FALSE;
2696         }
2697
2698         if (apv <= 88) {
2699                 header_size = sizeof(struct p_rs_param) - sizeof(*h);
2700                 data_size   = h->length  - header_size;
2701         } else /* apv >= 89 */ {
2702                 header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2703                 data_size   = h->length  - header_size;
2704                 D_ASSERT(data_size == 0);
2705         }
2706
2707         /* initialize verify_alg and csums_alg */
2708         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2709
2710         if (drbd_recv(mdev, h->payload, header_size) != header_size)
2711                 return FALSE;
2712
2713         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2714
2715         if (apv >= 88) {
2716                 if (apv == 88) {
2717                         if (data_size > SHARED_SECRET_MAX) {
2718                                 dev_err(DEV, "verify-alg too long, "
2719                                     "peer wants %u, accepting only %u byte\n",
2720                                                 data_size, SHARED_SECRET_MAX);
2721                                 return FALSE;
2722                         }
2723
2724                         if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2725                                 return FALSE;
2726
2727                         /* we expect NUL terminated string */
2728                         /* but just in case someone tries to be evil */
2729                         D_ASSERT(p->verify_alg[data_size-1] == 0);
2730                         p->verify_alg[data_size-1] = 0;
2731
2732                 } else /* apv >= 89 */ {
2733                         /* we still expect NUL terminated strings */
2734                         /* but just in case someone tries to be evil */
2735                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2736                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2737                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2738                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2739                 }
2740
2741                 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2742                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2743                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2744                                     mdev->sync_conf.verify_alg, p->verify_alg);
2745                                 goto disconnect;
2746                         }
2747                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2748                                         p->verify_alg, "verify-alg");
2749                         if (IS_ERR(verify_tfm)) {
2750                                 verify_tfm = NULL;
2751                                 goto disconnect;
2752                         }
2753                 }
2754
2755                 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2756                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2757                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2758                                     mdev->sync_conf.csums_alg, p->csums_alg);
2759                                 goto disconnect;
2760                         }
2761                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2762                                         p->csums_alg, "csums-alg");
2763                         if (IS_ERR(csums_tfm)) {
2764                                 csums_tfm = NULL;
2765                                 goto disconnect;
2766                         }
2767                 }
2768
2769
2770                 spin_lock(&mdev->peer_seq_lock);
2771                 /* lock against drbd_nl_syncer_conf() */
2772                 if (verify_tfm) {
2773                         strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2774                         mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2775                         crypto_free_hash(mdev->verify_tfm);
2776                         mdev->verify_tfm = verify_tfm;
2777                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2778                 }
2779                 if (csums_tfm) {
2780                         strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2781                         mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2782                         crypto_free_hash(mdev->csums_tfm);
2783                         mdev->csums_tfm = csums_tfm;
2784                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2785                 }
2786                 spin_unlock(&mdev->peer_seq_lock);
2787         }
2788
2789         return ok;
2790 disconnect:
2791         /* just for completeness: actually not needed,
2792          * as this is not reached if csums_tfm was ok. */
2793         crypto_free_hash(csums_tfm);
2794         /* but free the verify_tfm again, if csums_tfm did not work out */
2795         crypto_free_hash(verify_tfm);
2796         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2797         return FALSE;
2798 }
2799
2800 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2801 {
2802         /* sorry, we currently have no working implementation
2803          * of distributed TCQ */
2804 }
2805
2806 /* warn if the arguments differ by more than 12.5% */
2807 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2808         const char *s, sector_t a, sector_t b)
2809 {
2810         sector_t d;
2811         if (a == 0 || b == 0)
2812                 return;
2813         d = (a > b) ? (a - b) : (b - a);
2814         if (d > (a>>3) || d > (b>>3))
2815                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2816                      (unsigned long long)a, (unsigned long long)b);
2817 }
2818
2819 static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2820 {
2821         struct p_sizes *p = (struct p_sizes *)h;
2822         enum determine_dev_size dd = unchanged;
2823         unsigned int max_seg_s;
2824         sector_t p_size, p_usize, my_usize;
2825         int ldsc = 0; /* local disk size changed */
2826         enum drbd_conns nconn;
2827
2828         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2829         if (drbd_recv(mdev, h->payload, h->length) != h->length)
2830                 return FALSE;
2831
2832         p_size = be64_to_cpu(p->d_size);
2833         p_usize = be64_to_cpu(p->u_size);
2834
2835         if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2836                 dev_err(DEV, "some backing storage is needed\n");
2837                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2838                 return FALSE;
2839         }
2840
2841         /* just store the peer's disk size for now.
2842          * we still need to figure out whether we accept that. */
2843         mdev->p_size = p_size;
2844
2845 #define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2846         if (get_ldev(mdev)) {
2847                 warn_if_differ_considerably(mdev, "lower level device sizes",
2848                            p_size, drbd_get_max_capacity(mdev->ldev));
2849                 warn_if_differ_considerably(mdev, "user requested size",
2850                                             p_usize, mdev->ldev->dc.disk_size);
2851
2852                 /* if this is the first connect, or an otherwise expected
2853                  * param exchange, choose the minimum */
2854                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2855                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2856                                              p_usize);
2857
2858                 my_usize = mdev->ldev->dc.disk_size;
2859
2860                 if (mdev->ldev->dc.disk_size != p_usize) {
2861                         mdev->ldev->dc.disk_size = p_usize;
2862                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2863                              (unsigned long)mdev->ldev->dc.disk_size);
2864                 }
2865
2866                 /* Never shrink a device with usable data during connect.
2867                    But allow online shrinking if we are connected. */
2868                 if (drbd_new_dev_size(mdev, mdev->ldev) <
2869                    drbd_get_capacity(mdev->this_bdev) &&
2870                    mdev->state.disk >= D_OUTDATED &&
2871                    mdev->state.conn < C_CONNECTED) {
2872                         dev_err(DEV, "The peer's disk size is too small!\n");
2873                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2874                         mdev->ldev->dc.disk_size = my_usize;
2875                         put_ldev(mdev);
2876                         return FALSE;
2877                 }
2878                 put_ldev(mdev);
2879         }
2880 #undef min_not_zero
2881
2882         if (get_ldev(mdev)) {
2883                 dd = drbd_determin_dev_size(mdev);
2884                 put_ldev(mdev);
2885                 if (dd == dev_size_error)
2886                         return FALSE;
2887                 drbd_md_sync(mdev);
2888         } else {
2889                 /* I am diskless, need to accept the peer's size. */
2890                 drbd_set_my_capacity(mdev, p_size);
2891         }
2892
2893         if (mdev->p_uuid && mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
2894                 nconn = drbd_sync_handshake(mdev,
2895                                 mdev->state.peer, mdev->state.pdsk);
2896                 put_ldev(mdev);
2897
2898                 if (nconn == C_MASK) {
2899                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2900                         return FALSE;
2901                 }
2902
2903                 if (drbd_request_state(mdev, NS(conn, nconn)) < SS_SUCCESS) {
2904                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2905                         return FALSE;
2906                 }
2907         }
2908
2909         if (get_ldev(mdev)) {
2910                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
2911                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
2912                         ldsc = 1;
2913                 }
2914
2915                 max_seg_s = be32_to_cpu(p->max_segment_size);
2916                 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
2917                         drbd_setup_queue_param(mdev, max_seg_s);
2918
2919                 drbd_setup_order_type(mdev, be32_to_cpu(p->queue_order_type));
2920                 put_ldev(mdev);
2921         }
2922
2923         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
2924                 if (be64_to_cpu(p->c_size) !=
2925                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
2926                         /* we have different sizes, probably peer
2927                          * needs to know my new size... */
2928                         drbd_send_sizes(mdev, 0);
2929                 }
2930                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
2931                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
2932                         if (mdev->state.pdsk >= D_INCONSISTENT &&
2933                             mdev->state.disk >= D_INCONSISTENT)
2934                                 resync_after_online_grow(mdev);
2935                         else
2936                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
2937                 }
2938         }
2939
2940         return TRUE;
2941 }
2942
2943 static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
2944 {
2945         struct p_uuids *p = (struct p_uuids *)h;
2946         u64 *p_uuid;
2947         int i;
2948
2949         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2950         if (drbd_recv(mdev, h->payload, h->length) != h->length)
2951                 return FALSE;
2952
2953         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
2954
2955         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
2956                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
2957
2958         kfree(mdev->p_uuid);
2959         mdev->p_uuid = p_uuid;
2960
2961         if (mdev->state.conn < C_CONNECTED &&
2962             mdev->state.disk < D_INCONSISTENT &&
2963             mdev->state.role == R_PRIMARY &&
2964             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
2965                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
2966                     (unsigned long long)mdev->ed_uuid);
2967                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2968                 return FALSE;
2969         }
2970
2971         if (get_ldev(mdev)) {
2972                 int skip_initial_sync =
2973                         mdev->state.conn == C_CONNECTED &&
2974                         mdev->agreed_pro_version >= 90 &&
2975                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
2976                         (p_uuid[UI_FLAGS] & 8);
2977                 if (skip_initial_sync) {
2978                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
2979                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
2980                                         "clear_n_write from receive_uuids");
2981                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
2982                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
2983                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
2984                                         CS_VERBOSE, NULL);
2985                         drbd_md_sync(mdev);
2986                 }
2987                 put_ldev(mdev);
2988         }
2989
2990         /* Before we test for the disk state, we should wait until an eventually
2991            ongoing cluster wide state change is finished. That is important if
2992            we are primary and are detaching from our disk. We need to see the
2993            new disk state... */
2994         wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
2995         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
2996                 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
2997
2998         return TRUE;
2999 }
3000
3001 /**
3002  * convert_state() - Converts the peer's view of the cluster state to our point of view
3003  * @ps:         The state as seen by the peer.
3004  */
3005 static union drbd_state convert_state(union drbd_state ps)
3006 {
3007         union drbd_state ms;
3008
3009         static enum drbd_conns c_tab[] = {
3010                 [C_CONNECTED] = C_CONNECTED,
3011
3012                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3013                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3014                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3015                 [C_VERIFY_S]       = C_VERIFY_T,
3016                 [C_MASK]   = C_MASK,
3017         };
3018
3019         ms.i = ps.i;
3020
3021         ms.conn = c_tab[ps.conn];
3022         ms.peer = ps.role;
3023         ms.role = ps.peer;
3024         ms.pdsk = ps.disk;
3025         ms.disk = ps.pdsk;
3026         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3027
3028         return ms;
3029 }
3030
3031 static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3032 {
3033         struct p_req_state *p = (struct p_req_state *)h;
3034         union drbd_state mask, val;
3035         int rv;
3036
3037         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3038         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3039                 return FALSE;
3040
3041         mask.i = be32_to_cpu(p->mask);
3042         val.i = be32_to_cpu(p->val);
3043
3044         if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3045             test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3046                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3047                 return TRUE;
3048         }
3049
3050         mask = convert_state(mask);
3051         val = convert_state(val);
3052
3053         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3054
3055         drbd_send_sr_reply(mdev, rv);
3056         drbd_md_sync(mdev);
3057
3058         return TRUE;
3059 }
3060
3061 static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3062 {
3063         struct p_state *p = (struct p_state *)h;
3064         enum drbd_conns nconn, oconn;
3065         union drbd_state ns, peer_state;
3066         enum drbd_disk_state real_peer_disk;
3067         int rv;
3068
3069         ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3070                 return FALSE;
3071
3072         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3073                 return FALSE;
3074
3075         peer_state.i = be32_to_cpu(p->state);
3076
3077         real_peer_disk = peer_state.disk;
3078         if (peer_state.disk == D_NEGOTIATING) {
3079                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3080                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3081         }
3082
3083         spin_lock_irq(&mdev->req_lock);
3084  retry:
3085         oconn = nconn = mdev->state.conn;
3086         spin_unlock_irq(&mdev->req_lock);
3087
3088         if (nconn == C_WF_REPORT_PARAMS)
3089                 nconn = C_CONNECTED;
3090
3091         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3092             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3093                 int cr; /* consider resync */
3094
3095                 /* if we established a new connection */
3096                 cr  = (oconn < C_CONNECTED);
3097                 /* if we had an established connection
3098                  * and one of the nodes newly attaches a disk */
3099                 cr |= (oconn == C_CONNECTED &&
3100                        (peer_state.disk == D_NEGOTIATING ||
3101                         mdev->state.disk == D_NEGOTIATING));
3102                 /* if we have both been inconsistent, and the peer has been
3103                  * forced to be UpToDate with --overwrite-data */
3104                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3105                 /* if we had been plain connected, and the admin requested to
3106                  * start a sync by "invalidate" or "invalidate-remote" */
3107                 cr |= (oconn == C_CONNECTED &&
3108                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3109                                  peer_state.conn <= C_WF_BITMAP_T));
3110
3111                 if (cr)
3112                         nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3113
3114                 put_ldev(mdev);
3115                 if (nconn == C_MASK) {
3116                         if (mdev->state.disk == D_NEGOTIATING) {
3117                                 drbd_force_state(mdev, NS(disk, D_DISKLESS));
3118                                 nconn = C_CONNECTED;
3119                         } else if (peer_state.disk == D_NEGOTIATING) {
3120                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3121                                 peer_state.disk = D_DISKLESS;
3122                         } else {
3123                                 D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3124                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3125                                 return FALSE;
3126                         }
3127                 }
3128         }
3129
3130         spin_lock_irq(&mdev->req_lock);
3131         if (mdev->state.conn != oconn)
3132                 goto retry;
3133         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3134         ns.i = mdev->state.i;
3135         ns.conn = nconn;
3136         ns.peer = peer_state.role;
3137         ns.pdsk = real_peer_disk;
3138         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3139         if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3140                 ns.disk = mdev->new_state_tmp.disk;
3141
3142         rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3143         ns = mdev->state;
3144         spin_unlock_irq(&mdev->req_lock);
3145
3146         if (rv < SS_SUCCESS) {
3147                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3148                 return FALSE;
3149         }
3150
3151         if (oconn > C_WF_REPORT_PARAMS) {
3152                 if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3153                     peer_state.disk != D_NEGOTIATING ) {
3154                         /* we want resync, peer has not yet decided to sync... */
3155                         /* Nowadays only used when forcing a node into primary role and
3156                            setting its disk to UpToDate with that */
3157                         drbd_send_uuids(mdev);
3158                         drbd_send_state(mdev);
3159                 }
3160         }
3161
3162         mdev->net_conf->want_lose = 0;
3163
3164         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3165
3166         return TRUE;
3167 }
3168
3169 static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3170 {
3171         struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3172
3173         wait_event(mdev->misc_wait,
3174                    mdev->state.conn == C_WF_SYNC_UUID ||
3175                    mdev->state.conn < C_CONNECTED ||
3176                    mdev->state.disk < D_NEGOTIATING);
3177
3178         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3179
3180         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3181         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3182                 return FALSE;
3183
3184         /* Here the _drbd_uuid_ functions are right, current should
3185            _not_ be rotated into the history */
3186         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3187                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3188                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3189
3190                 drbd_start_resync(mdev, C_SYNC_TARGET);
3191
3192                 put_ldev(mdev);
3193         } else
3194                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3195
3196         return TRUE;
3197 }
3198
3199 enum receive_bitmap_ret { OK, DONE, FAILED };
3200
3201 static enum receive_bitmap_ret
3202 receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3203         unsigned long *buffer, struct bm_xfer_ctx *c)
3204 {
3205         unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3206         unsigned want = num_words * sizeof(long);
3207
3208         if (want != h->length) {
3209                 dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3210                 return FAILED;
3211         }
3212         if (want == 0)
3213                 return DONE;
3214         if (drbd_recv(mdev, buffer, want) != want)
3215                 return FAILED;
3216
3217         drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3218
3219         c->word_offset += num_words;
3220         c->bit_offset = c->word_offset * BITS_PER_LONG;
3221         if (c->bit_offset > c->bm_bits)
3222                 c->bit_offset = c->bm_bits;
3223
3224         return OK;
3225 }
3226
3227 static enum receive_bitmap_ret
3228 recv_bm_rle_bits(struct drbd_conf *mdev,
3229                 struct p_compressed_bm *p,
3230                 struct bm_xfer_ctx *c)
3231 {
3232         struct bitstream bs;
3233         u64 look_ahead;
3234         u64 rl;
3235         u64 tmp;
3236         unsigned long s = c->bit_offset;
3237         unsigned long e;
3238         int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3239         int toggle = DCBP_get_start(p);
3240         int have;
3241         int bits;
3242
3243         bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3244
3245         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3246         if (bits < 0)
3247                 return FAILED;
3248
3249         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3250                 bits = vli_decode_bits(&rl, look_ahead);
3251                 if (bits <= 0)
3252                         return FAILED;
3253
3254                 if (toggle) {
3255                         e = s + rl -1;
3256                         if (e >= c->bm_bits) {
3257                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3258                                 return FAILED;
3259                         }
3260                         _drbd_bm_set_bits(mdev, s, e);
3261                 }
3262
3263                 if (have < bits) {
3264                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3265                                 have, bits, look_ahead,
3266                                 (unsigned int)(bs.cur.b - p->code),
3267                                 (unsigned int)bs.buf_len);
3268                         return FAILED;
3269                 }
3270                 look_ahead >>= bits;
3271                 have -= bits;
3272
3273                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3274                 if (bits < 0)
3275                         return FAILED;
3276                 look_ahead |= tmp << have;
3277                 have += bits;
3278         }
3279
3280         c->bit_offset = s;
3281         bm_xfer_ctx_bit_to_word_offset(c);
3282
3283         return (s == c->bm_bits) ? DONE : OK;
3284 }
3285
3286 static enum receive_bitmap_ret
3287 decode_bitmap_c(struct drbd_conf *mdev,
3288                 struct p_compressed_bm *p,
3289                 struct bm_xfer_ctx *c)
3290 {
3291         if (DCBP_get_code(p) == RLE_VLI_Bits)
3292                 return recv_bm_rle_bits(mdev, p, c);
3293
3294         /* other variants had been implemented for evaluation,
3295          * but have been dropped as this one turned out to be "best"
3296          * during all our tests. */
3297
3298         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3299         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3300         return FAILED;
3301 }
3302
3303 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3304                 const char *direction, struct bm_xfer_ctx *c)
3305 {
3306         /* what would it take to transfer it "plaintext" */
3307         unsigned plain = sizeof(struct p_header) *
3308                 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3309                 + c->bm_words * sizeof(long);
3310         unsigned total = c->bytes[0] + c->bytes[1];
3311         unsigned r;
3312
3313         /* total can not be zero. but just in case: */
3314         if (total == 0)
3315                 return;
3316
3317         /* don't report if not compressed */
3318         if (total >= plain)
3319                 return;
3320
3321         /* total < plain. check for overflow, still */
3322         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3323                                     : (1000 * total / plain);
3324
3325         if (r > 1000)
3326                 r = 1000;
3327
3328         r = 1000 - r;
3329         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3330              "total %u; compression: %u.%u%%\n",
3331                         direction,
3332                         c->bytes[1], c->packets[1],
3333                         c->bytes[0], c->packets[0],
3334                         total, r/10, r % 10);
3335 }
3336
3337 /* Since we are processing the bitfield from lower addresses to higher,
3338    it does not matter if the process it in 32 bit chunks or 64 bit
3339    chunks as long as it is little endian. (Understand it as byte stream,
3340    beginning with the lowest byte...) If we would use big endian
3341    we would need to process it from the highest address to the lowest,
3342    in order to be agnostic to the 32 vs 64 bits issue.
3343
3344    returns 0 on failure, 1 if we successfully received it. */
3345 static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3346 {
3347         struct bm_xfer_ctx c;
3348         void *buffer;
3349         enum receive_bitmap_ret ret;
3350         int ok = FALSE;
3351
3352         wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3353
3354         drbd_bm_lock(mdev, "receive bitmap");
3355
3356         /* maybe we should use some per thread scratch page,
3357          * and allocate that during initial device creation? */
3358         buffer   = (unsigned long *) __get_free_page(GFP_NOIO);
3359         if (!buffer) {
3360                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3361                 goto out;
3362         }
3363
3364         c = (struct bm_xfer_ctx) {
3365                 .bm_bits = drbd_bm_bits(mdev),
3366                 .bm_words = drbd_bm_words(mdev),
3367         };
3368
3369         do {
3370                 if (h->command == P_BITMAP) {
3371                         ret = receive_bitmap_plain(mdev, h, buffer, &c);
3372                 } else if (h->command == P_COMPRESSED_BITMAP) {
3373                         /* MAYBE: sanity check that we speak proto >= 90,
3374                          * and the feature is enabled! */
3375                         struct p_compressed_bm *p;
3376
3377                         if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3378                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3379                                 goto out;
3380                         }
3381                         /* use the page buff */
3382                         p = buffer;
3383                         memcpy(p, h, sizeof(*h));
3384                         if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3385                                 goto out;
3386                         if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3387                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3388                                 return FAILED;
3389                         }
3390                         ret = decode_bitmap_c(mdev, p, &c);
3391                 } else {
3392                         dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3393                         goto out;
3394                 }
3395
3396                 c.packets[h->command == P_BITMAP]++;
3397                 c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3398
3399                 if (ret != OK)
3400                         break;
3401
3402                 if (!drbd_recv_header(mdev, h))
3403                         goto out;
3404         } while (ret == OK);
3405         if (ret == FAILED)
3406                 goto out;
3407
3408         INFO_bm_xfer_stats(mdev, "receive", &c);
3409
3410         if (mdev->state.conn == C_WF_BITMAP_T) {
3411                 ok = !drbd_send_bitmap(mdev);
3412                 if (!ok)
3413                         goto out;
3414                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3415                 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3416                 D_ASSERT(ok == SS_SUCCESS);
3417         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3418                 /* admin may have requested C_DISCONNECTING,
3419                  * other threads may have noticed network errors */
3420                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3421                     drbd_conn_str(mdev->state.conn));
3422         }
3423
3424         ok = TRUE;
3425  out:
3426         drbd_bm_unlock(mdev);
3427         if (ok && mdev->state.conn == C_WF_BITMAP_S)
3428                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3429         free_page((unsigned long) buffer);
3430         return ok;
3431 }
3432
3433 static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3434 {
3435         /* TODO zero copy sink :) */
3436         static char sink[128];
3437         int size, want, r;
3438
3439         dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3440              h->command, h->length);
3441
3442         size = h->length;
3443         while (size > 0) {
3444                 want = min_t(int, size, sizeof(sink));
3445                 r = drbd_recv(mdev, sink, want);
3446                 ERR_IF(r <= 0) break;
3447                 size -= r;
3448         }
3449         return size == 0;
3450 }
3451
3452 static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3453 {
3454         if (mdev->state.disk >= D_INCONSISTENT)
3455                 drbd_kick_lo(mdev);
3456
3457         /* Make sure we've acked all the TCP data associated
3458          * with the data requests being unplugged */
3459         drbd_tcp_quickack(mdev->data.socket);
3460
3461         return TRUE;
3462 }
3463
3464 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3465
3466 static drbd_cmd_handler_f drbd_default_handler[] = {
3467         [P_DATA]            = receive_Data,
3468         [P_DATA_REPLY]      = receive_DataReply,
3469         [P_RS_DATA_REPLY]   = receive_RSDataReply,
3470         [P_BARRIER]         = receive_Barrier,
3471         [P_BITMAP]          = receive_bitmap,
3472         [P_COMPRESSED_BITMAP]    = receive_bitmap,
3473         [P_UNPLUG_REMOTE]   = receive_UnplugRemote,
3474         [P_DATA_REQUEST]    = receive_DataRequest,
3475         [P_RS_DATA_REQUEST] = receive_DataRequest,
3476         [P_SYNC_PARAM]      = receive_SyncParam,
3477         [P_SYNC_PARAM89]           = receive_SyncParam,
3478         [P_PROTOCOL]        = receive_protocol,
3479         [P_UUIDS]           = receive_uuids,
3480         [P_SIZES]           = receive_sizes,
3481         [P_STATE]           = receive_state,
3482         [P_STATE_CHG_REQ]   = receive_req_state,
3483         [P_SYNC_UUID]       = receive_sync_uuid,
3484         [P_OV_REQUEST]      = receive_DataRequest,
3485         [P_OV_REPLY]        = receive_DataRequest,
3486         [P_CSUM_RS_REQUEST]    = receive_DataRequest,
3487         /* anything missing from this table is in
3488          * the asender_tbl, see get_asender_cmd */
3489         [P_MAX_CMD]         = NULL,
3490 };
3491
3492 static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3493 static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3494
3495 static void drbdd(struct drbd_conf *mdev)
3496 {
3497         drbd_cmd_handler_f handler;
3498         struct p_header *header = &mdev->data.rbuf.header;
3499
3500         while (get_t_state(&mdev->receiver) == Running) {
3501                 drbd_thread_current_set_cpu(mdev);
3502                 if (!drbd_recv_header(mdev, header)) {
3503                         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3504                         break;
3505                 }
3506
3507                 if (header->command < P_MAX_CMD)
3508                         handler = drbd_cmd_handler[header->command];
3509                 else if (P_MAY_IGNORE < header->command
3510                      && header->command < P_MAX_OPT_CMD)
3511                         handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3512                 else if (header->command > P_MAX_OPT_CMD)
3513                         handler = receive_skip;
3514                 else
3515                         handler = NULL;
3516
3517                 if (unlikely(!handler)) {
3518                         dev_err(DEV, "unknown packet type %d, l: %d!\n",
3519                             header->command, header->length);
3520                         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3521                         break;
3522                 }
3523                 if (unlikely(!handler(mdev, header))) {
3524                         dev_err(DEV, "error receiving %s, l: %d!\n",
3525                             cmdname(header->command), header->length);
3526                         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3527                         break;
3528                 }
3529         }
3530 }
3531
3532 static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3533 {
3534         struct hlist_head *slot;
3535         struct hlist_node *pos;
3536         struct hlist_node *tmp;
3537         struct drbd_request *req;
3538         int i;
3539
3540         /*
3541          * Application READ requests
3542          */
3543         spin_lock_irq(&mdev->req_lock);
3544         for (i = 0; i < APP_R_HSIZE; i++) {
3545                 slot = mdev->app_reads_hash+i;
3546                 hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3547                         /* it may (but should not any longer!)
3548                          * be on the work queue; if that assert triggers,
3549                          * we need to also grab the
3550                          * spin_lock_irq(&mdev->data.work.q_lock);
3551                          * and list_del_init here. */
3552                         D_ASSERT(list_empty(&req->w.list));
3553                         /* It would be nice to complete outside of spinlock.
3554                          * But this is easier for now. */
3555                         _req_mod(req, connection_lost_while_pending);
3556                 }
3557         }
3558         for (i = 0; i < APP_R_HSIZE; i++)
3559                 if (!hlist_empty(mdev->app_reads_hash+i))
3560                         dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3561                                 "%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3562
3563         memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3564         spin_unlock_irq(&mdev->req_lock);
3565 }
3566
3567 void drbd_flush_workqueue(struct drbd_conf *mdev)
3568 {
3569         struct drbd_wq_barrier barr;
3570
3571         barr.w.cb = w_prev_work_done;
3572         init_completion(&barr.done);
3573         drbd_queue_work(&mdev->data.work, &barr.w);
3574         wait_for_completion(&barr.done);
3575 }
3576
3577 static void drbd_disconnect(struct drbd_conf *mdev)
3578 {
3579         enum drbd_fencing_p fp;
3580         union drbd_state os, ns;
3581         int rv = SS_UNKNOWN_ERROR;
3582         unsigned int i;
3583
3584         if (mdev->state.conn == C_STANDALONE)
3585                 return;
3586         if (mdev->state.conn >= C_WF_CONNECTION)
3587                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3588                                 drbd_conn_str(mdev->state.conn));
3589
3590         /* asender does not clean up anything. it must not interfere, either */
3591         drbd_thread_stop(&mdev->asender);
3592
3593         mutex_lock(&mdev->data.mutex);
3594         drbd_free_sock(mdev);
3595         mutex_unlock(&mdev->data.mutex);
3596
3597         spin_lock_irq(&mdev->req_lock);
3598         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3599         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3600         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3601         spin_unlock_irq(&mdev->req_lock);
3602
3603         /* We do not have data structures that would allow us to
3604          * get the rs_pending_cnt down to 0 again.
3605          *  * On C_SYNC_TARGET we do not have any data structures describing
3606          *    the pending RSDataRequest's we have sent.
3607          *  * On C_SYNC_SOURCE there is no data structure that tracks
3608          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3609          *  And no, it is not the sum of the reference counts in the
3610          *  resync_LRU. The resync_LRU tracks the whole operation including
3611          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3612          *  on the fly. */
3613         drbd_rs_cancel_all(mdev);
3614         mdev->rs_total = 0;
3615         mdev->rs_failed = 0;
3616         atomic_set(&mdev->rs_pending_cnt, 0);
3617         wake_up(&mdev->misc_wait);
3618
3619         /* make sure syncer is stopped and w_resume_next_sg queued */
3620         del_timer_sync(&mdev->resync_timer);
3621         set_bit(STOP_SYNC_TIMER, &mdev->flags);
3622         resync_timer_fn((unsigned long)mdev);
3623
3624         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3625          * w_make_resync_request etc. which may still be on the worker queue
3626          * to be "canceled" */
3627         drbd_flush_workqueue(mdev);
3628
3629         /* This also does reclaim_net_ee().  If we do this too early, we might
3630          * miss some resync ee and pages.*/
3631         drbd_process_done_ee(mdev);
3632
3633         kfree(mdev->p_uuid);
3634         mdev->p_uuid = NULL;
3635
3636         if (!mdev->state.susp)
3637                 tl_clear(mdev);
3638
3639         drbd_fail_pending_reads(mdev);
3640
3641         dev_info(DEV, "Connection closed\n");
3642
3643         drbd_md_sync(mdev);
3644
3645         fp = FP_DONT_CARE;
3646         if (get_ldev(mdev)) {
3647                 fp = mdev->ldev->dc.fencing;
3648                 put_ldev(mdev);
3649         }
3650
3651         if (mdev->state.role == R_PRIMARY) {
3652                 if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3653                         enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3654                         drbd_request_state(mdev, NS(pdsk, nps));
3655                 }
3656         }
3657
3658         spin_lock_irq(&mdev->req_lock);
3659         os = mdev->state;
3660         if (os.conn >= C_UNCONNECTED) {
3661                 /* Do not restart in case we are C_DISCONNECTING */
3662                 ns = os;
3663                 ns.conn = C_UNCONNECTED;
3664                 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3665         }
3666         spin_unlock_irq(&mdev->req_lock);
3667
3668         if (os.conn == C_DISCONNECTING) {
3669                 struct hlist_head *h;
3670                 wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3671
3672                 /* we must not free the tl_hash
3673                  * while application io is still on the fly */
3674                 wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3675
3676                 spin_lock_irq(&mdev->req_lock);
3677                 /* paranoia code */
3678                 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3679                         if (h->first)
3680                                 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3681                                                 (int)(h - mdev->ee_hash), h->first);
3682                 kfree(mdev->ee_hash);
3683                 mdev->ee_hash = NULL;
3684                 mdev->ee_hash_s = 0;
3685
3686                 /* paranoia code */
3687                 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3688                         if (h->first)
3689                                 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3690                                                 (int)(h - mdev->tl_hash), h->first);
3691                 kfree(mdev->tl_hash);
3692                 mdev->tl_hash = NULL;
3693                 mdev->tl_hash_s = 0;
3694                 spin_unlock_irq(&mdev->req_lock);
3695
3696                 crypto_free_hash(mdev->cram_hmac_tfm);
3697                 mdev->cram_hmac_tfm = NULL;
3698
3699                 kfree(mdev->net_conf);
3700                 mdev->net_conf = NULL;
3701                 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3702         }
3703
3704         /* tcp_close and release of sendpage pages can be deferred.  I don't
3705          * want to use SO_LINGER, because apparently it can be deferred for
3706          * more than 20 seconds (longest time I checked).
3707          *
3708          * Actually we don't care for exactly when the network stack does its
3709          * put_page(), but release our reference on these pages right here.
3710          */
3711         i = drbd_release_ee(mdev, &mdev->net_ee);
3712         if (i)
3713                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3714         i = atomic_read(&mdev->pp_in_use);
3715         if (i)
3716                 dev_info(DEV, "pp_in_use = %u, expected 0\n", i);
3717
3718         D_ASSERT(list_empty(&mdev->read_ee));
3719         D_ASSERT(list_empty(&mdev->active_ee));
3720         D_ASSERT(list_empty(&mdev->sync_ee));
3721         D_ASSERT(list_empty(&mdev->done_ee));
3722
3723         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3724         atomic_set(&mdev->current_epoch->epoch_size, 0);
3725         D_ASSERT(list_empty(&mdev->current_epoch->list));
3726 }
3727
3728 /*
3729  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3730  * we can agree on is stored in agreed_pro_version.
3731  *
3732  * feature flags and the reserved array should be enough room for future
3733  * enhancements of the handshake protocol, and possible plugins...
3734  *
3735  * for now, they are expected to be zero, but ignored.
3736  */
3737 static int drbd_send_handshake(struct drbd_conf *mdev)
3738 {
3739         /* ASSERT current == mdev->receiver ... */
3740         struct p_handshake *p = &mdev->data.sbuf.handshake;
3741         int ok;
3742
3743         if (mutex_lock_interruptible(&mdev->data.mutex)) {
3744                 dev_err(DEV, "interrupted during initial handshake\n");
3745                 return 0; /* interrupted. not ok. */
3746         }
3747
3748         if (mdev->data.socket == NULL) {
3749                 mutex_unlock(&mdev->data.mutex);
3750                 return 0;
3751         }
3752
3753         memset(p, 0, sizeof(*p));
3754         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3755         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3756         ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3757                              (struct p_header *)p, sizeof(*p), 0 );
3758         mutex_unlock(&mdev->data.mutex);
3759         return ok;
3760 }
3761
3762 /*
3763  * return values:
3764  *   1 yes, we have a valid connection
3765  *   0 oops, did not work out, please try again
3766  *  -1 peer talks different language,
3767  *     no point in trying again, please go standalone.
3768  */
3769 static int drbd_do_handshake(struct drbd_conf *mdev)
3770 {
3771         /* ASSERT current == mdev->receiver ... */
3772         struct p_handshake *p = &mdev->data.rbuf.handshake;
3773         const int expect = sizeof(struct p_handshake)
3774                           -sizeof(struct p_header);
3775         int rv;
3776
3777         rv = drbd_send_handshake(mdev);
3778         if (!rv)
3779                 return 0;
3780
3781         rv = drbd_recv_header(mdev, &p->head);
3782         if (!rv)
3783                 return 0;
3784
3785         if (p->head.command != P_HAND_SHAKE) {
3786                 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3787                      cmdname(p->head.command), p->head.command);
3788                 return -1;
3789         }
3790
3791         if (p->head.length != expect) {
3792                 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3793                      expect, p->head.length);
3794                 return -1;
3795         }
3796
3797         rv = drbd_recv(mdev, &p->head.payload, expect);
3798
3799         if (rv != expect) {
3800                 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3801                 return 0;
3802         }
3803
3804         p->protocol_min = be32_to_cpu(p->protocol_min);
3805         p->protocol_max = be32_to_cpu(p->protocol_max);
3806         if (p->protocol_max == 0)
3807                 p->protocol_max = p->protocol_min;
3808
3809         if (PRO_VERSION_MAX < p->protocol_min ||
3810             PRO_VERSION_MIN > p->protocol_max)
3811                 goto incompat;
3812
3813         mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3814
3815         dev_info(DEV, "Handshake successful: "
3816              "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3817
3818         return 1;
3819
3820  incompat:
3821         dev_err(DEV, "incompatible DRBD dialects: "
3822             "I support %d-%d, peer supports %d-%d\n",
3823             PRO_VERSION_MIN, PRO_VERSION_MAX,
3824             p->protocol_min, p->protocol_max);
3825         return -1;
3826 }
3827
3828 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3829 static int drbd_do_auth(struct drbd_conf *mdev)
3830 {
3831         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3832         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3833         return 0;
3834 }
3835 #else
3836 #define CHALLENGE_LEN 64
3837 static int drbd_do_auth(struct drbd_conf *mdev)
3838 {
3839         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
3840         struct scatterlist sg;
3841         char *response = NULL;
3842         char *right_response = NULL;
3843         char *peers_ch = NULL;
3844         struct p_header p;
3845         unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3846         unsigned int resp_size;
3847         struct hash_desc desc;
3848         int rv;
3849
3850         desc.tfm = mdev->cram_hmac_tfm;
3851         desc.flags = 0;
3852
3853         rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3854                                 (u8 *)mdev->net_conf->shared_secret, key_len);
3855         if (rv) {
3856                 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
3857                 rv = 0;
3858                 goto fail;
3859         }
3860
3861         get_random_bytes(my_challenge, CHALLENGE_LEN);
3862
3863         rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
3864         if (!rv)
3865                 goto fail;
3866
3867         rv = drbd_recv_header(mdev, &p);
3868         if (!rv)
3869                 goto fail;
3870
3871         if (p.command != P_AUTH_CHALLENGE) {
3872                 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
3873                     cmdname(p.command), p.command);
3874                 rv = 0;
3875                 goto fail;
3876         }
3877
3878         if (p.length > CHALLENGE_LEN*2) {
3879                 dev_err(DEV, "expected AuthChallenge payload too big.\n");
3880                 rv = 0;
3881                 goto fail;
3882         }
3883
3884         peers_ch = kmalloc(p.length, GFP_NOIO);
3885         if (peers_ch == NULL) {
3886                 dev_err(DEV, "kmalloc of peers_ch failed\n");
3887                 rv = 0;
3888                 goto fail;
3889         }
3890
3891         rv = drbd_recv(mdev, peers_ch, p.length);
3892
3893         if (rv != p.length) {
3894                 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
3895                 rv = 0;
3896                 goto fail;
3897         }
3898
3899         resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
3900         response = kmalloc(resp_size, GFP_NOIO);
3901         if (response == NULL) {
3902                 dev_err(DEV, "kmalloc of response failed\n");
3903                 rv = 0;
3904                 goto fail;
3905         }
3906
3907         sg_init_table(&sg, 1);
3908         sg_set_buf(&sg, peers_ch, p.length);
3909
3910         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
3911         if (rv) {
3912                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
3913                 rv = 0;
3914                 goto fail;
3915         }
3916
3917         rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
3918         if (!rv)
3919                 goto fail;
3920
3921         rv = drbd_recv_header(mdev, &p);
3922         if (!rv)
3923                 goto fail;
3924
3925         if (p.command != P_AUTH_RESPONSE) {
3926                 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
3927                     cmdname(p.command), p.command);
3928                 rv = 0;
3929                 goto fail;
3930         }
3931
3932         if (p.length != resp_size) {
3933                 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
3934                 rv = 0;
3935                 goto fail;
3936         }
3937
3938         rv = drbd_recv(mdev, response , resp_size);
3939
3940         if (rv != resp_size) {
3941                 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
3942                 rv = 0;
3943                 goto fail;
3944         }
3945
3946         right_response = kmalloc(resp_size, GFP_NOIO);
3947         if (response == NULL) {
3948                 dev_err(DEV, "kmalloc of right_response failed\n");
3949                 rv = 0;
3950                 goto fail;
3951         }
3952
3953         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
3954
3955         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
3956         if (rv) {
3957                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
3958                 rv = 0;
3959                 goto fail;
3960         }
3961
3962         rv = !memcmp(response, right_response, resp_size);
3963
3964         if (rv)
3965                 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
3966                      resp_size, mdev->net_conf->cram_hmac_alg);
3967
3968  fail:
3969         kfree(peers_ch);
3970         kfree(response);
3971         kfree(right_response);
3972
3973         return rv;
3974 }
3975 #endif
3976
3977 int drbdd_init(struct drbd_thread *thi)
3978 {
3979         struct drbd_conf *mdev = thi->mdev;
3980         unsigned int minor = mdev_to_minor(mdev);
3981         int h;
3982
3983         sprintf(current->comm, "drbd%d_receiver", minor);
3984
3985         dev_info(DEV, "receiver (re)started\n");
3986
3987         do {
3988                 h = drbd_connect(mdev);
3989                 if (h == 0) {
3990                         drbd_disconnect(mdev);
3991                         __set_current_state(TASK_INTERRUPTIBLE);
3992                         schedule_timeout(HZ);
3993                 }
3994                 if (h == -1) {
3995                         dev_warn(DEV, "Discarding network configuration.\n");
3996                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3997                 }
3998         } while (h == 0);
3999
4000         if (h > 0) {
4001                 if (get_net_conf(mdev)) {
4002                         drbdd(mdev);
4003                         put_net_conf(mdev);
4004                 }
4005         }
4006
4007         drbd_disconnect(mdev);
4008
4009         dev_info(DEV, "receiver terminated\n");
4010         return 0;
4011 }
4012
4013 /* ********* acknowledge sender ******** */
4014
4015 static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4016 {
4017         struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4018
4019         int retcode = be32_to_cpu(p->retcode);
4020
4021         if (retcode >= SS_SUCCESS) {
4022                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4023         } else {
4024                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4025                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4026                     drbd_set_st_err_str(retcode), retcode);
4027         }
4028         wake_up(&mdev->state_wait);
4029
4030         return TRUE;
4031 }
4032
4033 static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4034 {
4035         return drbd_send_ping_ack(mdev);
4036
4037 }
4038
4039 static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4040 {
4041         /* restore idle timeout */
4042         mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4043
4044         return TRUE;
4045 }
4046
4047 static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4048 {
4049         struct p_block_ack *p = (struct p_block_ack *)h;
4050         sector_t sector = be64_to_cpu(p->sector);
4051         int blksize = be32_to_cpu(p->blksize);
4052
4053         D_ASSERT(mdev->agreed_pro_version >= 89);
4054
4055         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4056
4057         drbd_rs_complete_io(mdev, sector);
4058         drbd_set_in_sync(mdev, sector, blksize);
4059         /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4060         mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4061         dec_rs_pending(mdev);
4062
4063         return TRUE;
4064 }
4065
4066 /* when we receive the ACK for a write request,
4067  * verify that we actually know about it */
4068 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4069         u64 id, sector_t sector)
4070 {
4071         struct hlist_head *slot = tl_hash_slot(mdev, sector);
4072         struct hlist_node *n;
4073         struct drbd_request *req;
4074
4075         hlist_for_each_entry(req, n, slot, colision) {
4076                 if ((unsigned long)req == (unsigned long)id) {
4077                         if (req->sector != sector) {
4078                                 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4079                                     "wrong sector (%llus versus %llus)\n", req,
4080                                     (unsigned long long)req->sector,
4081                                     (unsigned long long)sector);
4082                                 break;
4083                         }
4084                         return req;
4085                 }
4086         }
4087         dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4088                 (void *)(unsigned long)id, (unsigned long long)sector);
4089         return NULL;
4090 }
4091
4092 typedef struct drbd_request *(req_validator_fn)
4093         (struct drbd_conf *mdev, u64 id, sector_t sector);
4094
4095 static int validate_req_change_req_state(struct drbd_conf *mdev,
4096         u64 id, sector_t sector, req_validator_fn validator,
4097         const char *func, enum drbd_req_event what)
4098 {
4099         struct drbd_request *req;
4100         struct bio_and_error m;
4101
4102         spin_lock_irq(&mdev->req_lock);
4103         req = validator(mdev, id, sector);
4104         if (unlikely(!req)) {
4105                 spin_unlock_irq(&mdev->req_lock);
4106                 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4107                 return FALSE;
4108         }
4109         __req_mod(req, what, &m);
4110         spin_unlock_irq(&mdev->req_lock);
4111
4112         if (m.bio)
4113                 complete_master_bio(mdev, &m);
4114         return TRUE;
4115 }
4116
4117 static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4118 {
4119         struct p_block_ack *p = (struct p_block_ack *)h;
4120         sector_t sector = be64_to_cpu(p->sector);
4121         int blksize = be32_to_cpu(p->blksize);
4122         enum drbd_req_event what;
4123
4124         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4125
4126         if (is_syncer_block_id(p->block_id)) {
4127                 drbd_set_in_sync(mdev, sector, blksize);
4128                 dec_rs_pending(mdev);
4129                 return TRUE;
4130         }
4131         switch (be16_to_cpu(h->command)) {
4132         case P_RS_WRITE_ACK:
4133                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4134                 what = write_acked_by_peer_and_sis;
4135                 break;
4136         case P_WRITE_ACK:
4137                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4138                 what = write_acked_by_peer;
4139                 break;
4140         case P_RECV_ACK:
4141                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4142                 what = recv_acked_by_peer;
4143                 break;
4144         case P_DISCARD_ACK:
4145                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4146                 what = conflict_discarded_by_peer;
4147                 break;
4148         default:
4149                 D_ASSERT(0);
4150                 return FALSE;
4151         }
4152
4153         return validate_req_change_req_state(mdev, p->block_id, sector,
4154                 _ack_id_to_req, __func__ , what);
4155 }
4156
4157 static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4158 {
4159         struct p_block_ack *p = (struct p_block_ack *)h;
4160         sector_t sector = be64_to_cpu(p->sector);
4161
4162         if (__ratelimit(&drbd_ratelimit_state))
4163                 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4164
4165         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4166
4167         if (is_syncer_block_id(p->block_id)) {
4168                 int size = be32_to_cpu(p->blksize);
4169                 dec_rs_pending(mdev);
4170                 drbd_rs_failed_io(mdev, sector, size);
4171                 return TRUE;
4172         }
4173         return validate_req_change_req_state(mdev, p->block_id, sector,
4174                 _ack_id_to_req, __func__ , neg_acked);
4175 }
4176
4177 static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4178 {
4179         struct p_block_ack *p = (struct p_block_ack *)h;
4180         sector_t sector = be64_to_cpu(p->sector);
4181
4182         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4183         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4184             (unsigned long long)sector, be32_to_cpu(p->blksize));
4185
4186         return validate_req_change_req_state(mdev, p->block_id, sector,
4187                 _ar_id_to_req, __func__ , neg_acked);
4188 }
4189
4190 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4191 {
4192         sector_t sector;
4193         int size;
4194         struct p_block_ack *p = (struct p_block_ack *)h;
4195
4196         sector = be64_to_cpu(p->sector);
4197         size = be32_to_cpu(p->blksize);
4198         D_ASSERT(p->block_id == ID_SYNCER);
4199
4200         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4201
4202         dec_rs_pending(mdev);
4203
4204         if (get_ldev_if_state(mdev, D_FAILED)) {
4205                 drbd_rs_complete_io(mdev, sector);
4206                 drbd_rs_failed_io(mdev, sector, size);
4207                 put_ldev(mdev);
4208         }
4209
4210         return TRUE;
4211 }
4212
4213 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4214 {
4215         struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4216
4217         tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4218
4219         return TRUE;
4220 }
4221
4222 static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4223 {
4224         struct p_block_ack *p = (struct p_block_ack *)h;
4225         struct drbd_work *w;
4226         sector_t sector;
4227         int size;
4228
4229         sector = be64_to_cpu(p->sector);
4230         size = be32_to_cpu(p->blksize);
4231
4232         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4233
4234         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4235                 drbd_ov_oos_found(mdev, sector, size);
4236         else
4237                 ov_oos_print(mdev);
4238
4239         drbd_rs_complete_io(mdev, sector);
4240         dec_rs_pending(mdev);
4241
4242         if (--mdev->ov_left == 0) {
4243                 w = kmalloc(sizeof(*w), GFP_NOIO);
4244                 if (w) {
4245                         w->cb = w_ov_finished;
4246                         drbd_queue_work_front(&mdev->data.work, w);
4247                 } else {
4248                         dev_err(DEV, "kmalloc(w) failed.");
4249                         ov_oos_print(mdev);
4250                         drbd_resync_finished(mdev);
4251                 }
4252         }
4253         return TRUE;
4254 }
4255
4256 struct asender_cmd {
4257         size_t pkt_size;
4258         int (*process)(struct drbd_conf *mdev, struct p_header *h);
4259 };
4260
4261 static struct asender_cmd *get_asender_cmd(int cmd)
4262 {
4263         static struct asender_cmd asender_tbl[] = {
4264                 /* anything missing from this table is in
4265                  * the drbd_cmd_handler (drbd_default_handler) table,
4266                  * see the beginning of drbdd() */
4267         [P_PING]            = { sizeof(struct p_header), got_Ping },
4268         [P_PING_ACK]        = { sizeof(struct p_header), got_PingAck },
4269         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4270         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4271         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4272         [P_DISCARD_ACK]     = { sizeof(struct p_block_ack), got_BlockAck },
4273         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4274         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4275         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4276         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4277         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4278         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4279         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4280         [P_MAX_CMD]         = { 0, NULL },
4281         };
4282         if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4283                 return NULL;
4284         return &asender_tbl[cmd];
4285 }
4286
4287 int drbd_asender(struct drbd_thread *thi)
4288 {
4289         struct drbd_conf *mdev = thi->mdev;
4290         struct p_header *h = &mdev->meta.rbuf.header;
4291         struct asender_cmd *cmd = NULL;
4292
4293         int rv, len;
4294         void *buf    = h;
4295         int received = 0;
4296         int expect   = sizeof(struct p_header);
4297         int empty;
4298
4299         sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4300
4301         current->policy = SCHED_RR;  /* Make this a realtime task! */
4302         current->rt_priority = 2;    /* more important than all other tasks */
4303
4304         while (get_t_state(thi) == Running) {
4305                 drbd_thread_current_set_cpu(mdev);
4306                 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4307                         ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4308                         mdev->meta.socket->sk->sk_rcvtimeo =
4309                                 mdev->net_conf->ping_timeo*HZ/10;
4310                 }
4311
4312                 /* conditionally cork;
4313                  * it may hurt latency if we cork without much to send */
4314                 if (!mdev->net_conf->no_cork &&
4315                         3 < atomic_read(&mdev->unacked_cnt))
4316                         drbd_tcp_cork(mdev->meta.socket);
4317                 while (1) {
4318                         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4319                         flush_signals(current);
4320                         if (!drbd_process_done_ee(mdev)) {
4321                                 dev_err(DEV, "process_done_ee() = NOT_OK\n");
4322                                 goto reconnect;
4323                         }
4324                         /* to avoid race with newly queued ACKs */
4325                         set_bit(SIGNAL_ASENDER, &mdev->flags);
4326                         spin_lock_irq(&mdev->req_lock);
4327                         empty = list_empty(&mdev->done_ee);
4328                         spin_unlock_irq(&mdev->req_lock);
4329                         /* new ack may have been queued right here,
4330                          * but then there is also a signal pending,
4331                          * and we start over... */
4332                         if (empty)
4333                                 break;
4334                 }
4335                 /* but unconditionally uncork unless disabled */
4336                 if (!mdev->net_conf->no_cork)
4337                         drbd_tcp_uncork(mdev->meta.socket);
4338
4339                 /* short circuit, recv_msg would return EINTR anyways. */
4340                 if (signal_pending(current))
4341                         continue;
4342
4343                 rv = drbd_recv_short(mdev, mdev->meta.socket,
4344                                      buf, expect-received, 0);
4345                 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4346
4347                 flush_signals(current);
4348
4349                 /* Note:
4350                  * -EINTR        (on meta) we got a signal
4351                  * -EAGAIN       (on meta) rcvtimeo expired
4352                  * -ECONNRESET   other side closed the connection
4353                  * -ERESTARTSYS  (on data) we got a signal
4354                  * rv <  0       other than above: unexpected error!
4355                  * rv == expected: full header or command
4356                  * rv <  expected: "woken" by signal during receive
4357                  * rv == 0       : "connection shut down by peer"
4358                  */
4359                 if (likely(rv > 0)) {
4360                         received += rv;
4361                         buf      += rv;
4362                 } else if (rv == 0) {
4363                         dev_err(DEV, "meta connection shut down by peer.\n");
4364                         goto reconnect;
4365                 } else if (rv == -EAGAIN) {
4366                         if (mdev->meta.socket->sk->sk_rcvtimeo ==
4367                             mdev->net_conf->ping_timeo*HZ/10) {
4368                                 dev_err(DEV, "PingAck did not arrive in time.\n");
4369                                 goto reconnect;
4370                         }
4371                         set_bit(SEND_PING, &mdev->flags);
4372                         continue;
4373                 } else if (rv == -EINTR) {
4374                         continue;
4375                 } else {
4376                         dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4377                         goto reconnect;
4378                 }
4379
4380                 if (received == expect && cmd == NULL) {
4381                         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4382                                 dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4383                                     (long)be32_to_cpu(h->magic),
4384                                     h->command, h->length);
4385                                 goto reconnect;
4386                         }
4387                         cmd = get_asender_cmd(be16_to_cpu(h->command));
4388                         len = be16_to_cpu(h->length);
4389                         if (unlikely(cmd == NULL)) {
4390                                 dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4391                                     (long)be32_to_cpu(h->magic),
4392                                     h->command, h->length);
4393                                 goto disconnect;
4394                         }
4395                         expect = cmd->pkt_size;
4396                         ERR_IF(len != expect-sizeof(struct p_header))
4397                                 goto reconnect;
4398                 }
4399                 if (received == expect) {
4400                         D_ASSERT(cmd != NULL);
4401                         if (!cmd->process(mdev, h))
4402                                 goto reconnect;
4403
4404                         buf      = h;
4405                         received = 0;
4406                         expect   = sizeof(struct p_header);
4407                         cmd      = NULL;
4408                 }
4409         }
4410
4411         if (0) {
4412 reconnect:
4413                 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4414         }
4415         if (0) {
4416 disconnect:
4417                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4418         }
4419         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4420
4421         D_ASSERT(mdev->state.conn < C_CONNECTED);
4422         dev_info(DEV, "asender terminated\n");
4423
4424         return 0;
4425 }