]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
drbd: Convert all constants in enum drbd_req_event to upper case
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 enum finish_epoch {
52         FE_STILL_LIVE,
53         FE_DESTROYED,
54         FE_RECYCLED,
55 };
56
57 static int drbd_do_handshake(struct drbd_conf *mdev);
58 static int drbd_do_auth(struct drbd_conf *mdev);
59
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62
63
64 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
65
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
75 static struct page *page_chain_del(struct page **head, int n)
76 {
77         struct page *page;
78         struct page *tmp;
79
80         BUG_ON(!n);
81         BUG_ON(!head);
82
83         page = *head;
84
85         if (!page)
86                 return NULL;
87
88         while (page) {
89                 tmp = page_chain_next(page);
90                 if (--n == 0)
91                         break; /* found sufficient pages */
92                 if (tmp == NULL)
93                         /* insufficient pages, don't use any of them. */
94                         return NULL;
95                 page = tmp;
96         }
97
98         /* add end of list marker for the returned list */
99         set_page_private(page, 0);
100         /* actual return value, and adjustment of head */
101         page = *head;
102         *head = tmp;
103         return page;
104 }
105
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111         struct page *tmp;
112         int i = 1;
113         while ((tmp = page_chain_next(page)))
114                 ++i, page = tmp;
115         if (len)
116                 *len = i;
117         return page;
118 }
119
120 static int page_chain_free(struct page *page)
121 {
122         struct page *tmp;
123         int i = 0;
124         page_chain_for_each_safe(page, tmp) {
125                 put_page(page);
126                 ++i;
127         }
128         return i;
129 }
130
131 static void page_chain_add(struct page **head,
132                 struct page *chain_first, struct page *chain_last)
133 {
134 #if 1
135         struct page *tmp;
136         tmp = page_chain_tail(chain_first, NULL);
137         BUG_ON(tmp != chain_last);
138 #endif
139
140         /* add chain to head */
141         set_page_private(chain_last, (unsigned long)*head);
142         *head = chain_first;
143 }
144
145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
146 {
147         struct page *page = NULL;
148         struct page *tmp = NULL;
149         int i = 0;
150
151         /* Yes, testing drbd_pp_vacant outside the lock is racy.
152          * So what. It saves a spin_lock. */
153         if (drbd_pp_vacant >= number) {
154                 spin_lock(&drbd_pp_lock);
155                 page = page_chain_del(&drbd_pp_pool, number);
156                 if (page)
157                         drbd_pp_vacant -= number;
158                 spin_unlock(&drbd_pp_lock);
159                 if (page)
160                         return page;
161         }
162
163         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164          * "criss-cross" setup, that might cause write-out on some other DRBD,
165          * which in turn might block on the other node at this very place.  */
166         for (i = 0; i < number; i++) {
167                 tmp = alloc_page(GFP_TRY);
168                 if (!tmp)
169                         break;
170                 set_page_private(tmp, (unsigned long)page);
171                 page = tmp;
172         }
173
174         if (i == number)
175                 return page;
176
177         /* Not enough pages immediately available this time.
178          * No need to jump around here, drbd_pp_alloc will retry this
179          * function "soon". */
180         if (page) {
181                 tmp = page_chain_tail(page, NULL);
182                 spin_lock(&drbd_pp_lock);
183                 page_chain_add(&drbd_pp_pool, page, tmp);
184                 drbd_pp_vacant += i;
185                 spin_unlock(&drbd_pp_lock);
186         }
187         return NULL;
188 }
189
190 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191 {
192         struct drbd_epoch_entry *e;
193         struct list_head *le, *tle;
194
195         /* The EEs are always appended to the end of the list. Since
196            they are sent in order over the wire, they have to finish
197            in order. As soon as we see the first not finished we can
198            stop to examine the list... */
199
200         list_for_each_safe(le, tle, &mdev->net_ee) {
201                 e = list_entry(le, struct drbd_epoch_entry, w.list);
202                 if (drbd_ee_has_active_page(e))
203                         break;
204                 list_move(le, to_be_freed);
205         }
206 }
207
208 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209 {
210         LIST_HEAD(reclaimed);
211         struct drbd_epoch_entry *e, *t;
212
213         spin_lock_irq(&mdev->req_lock);
214         reclaim_net_ee(mdev, &reclaimed);
215         spin_unlock_irq(&mdev->req_lock);
216
217         list_for_each_entry_safe(e, t, &reclaimed, w.list)
218                 drbd_free_net_ee(mdev, e);
219 }
220
221 /**
222  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
223  * @mdev:       DRBD device.
224  * @number:     number of pages requested
225  * @retry:      whether to retry, if not enough pages are available right now
226  *
227  * Tries to allocate number pages, first from our own page pool, then from
228  * the kernel, unless this allocation would exceed the max_buffers setting.
229  * Possibly retry until DRBD frees sufficient pages somewhere else.
230  *
231  * Returns a page chain linked via page->private.
232  */
233 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
234 {
235         struct page *page = NULL;
236         DEFINE_WAIT(wait);
237
238         /* Yes, we may run up to @number over max_buffers. If we
239          * follow it strictly, the admin will get it wrong anyways. */
240         if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
241                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
242
243         while (page == NULL) {
244                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
245
246                 drbd_kick_lo_and_reclaim_net(mdev);
247
248                 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
249                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250                         if (page)
251                                 break;
252                 }
253
254                 if (!retry)
255                         break;
256
257                 if (signal_pending(current)) {
258                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259                         break;
260                 }
261
262                 schedule();
263         }
264         finish_wait(&drbd_pp_wait, &wait);
265
266         if (page)
267                 atomic_add(number, &mdev->pp_in_use);
268         return page;
269 }
270
271 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
272  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
273  * Either links the page chain back to the global pool,
274  * or returns all pages to the system. */
275 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
276 {
277         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
278         int i;
279
280         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
281                 i = page_chain_free(page);
282         else {
283                 struct page *tmp;
284                 tmp = page_chain_tail(page, &i);
285                 spin_lock(&drbd_pp_lock);
286                 page_chain_add(&drbd_pp_pool, page, tmp);
287                 drbd_pp_vacant += i;
288                 spin_unlock(&drbd_pp_lock);
289         }
290         i = atomic_sub_return(i, a);
291         if (i < 0)
292                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
293                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
294         wake_up(&drbd_pp_wait);
295 }
296
297 /*
298 You need to hold the req_lock:
299  _drbd_wait_ee_list_empty()
300
301 You must not have the req_lock:
302  drbd_free_ee()
303  drbd_alloc_ee()
304  drbd_init_ee()
305  drbd_release_ee()
306  drbd_ee_fix_bhs()
307  drbd_process_done_ee()
308  drbd_clear_done_ee()
309  drbd_wait_ee_list_empty()
310 */
311
312 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
313                                      u64 id,
314                                      sector_t sector,
315                                      unsigned int data_size,
316                                      gfp_t gfp_mask) __must_hold(local)
317 {
318         struct drbd_epoch_entry *e;
319         struct page *page;
320         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
321
322         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
323                 return NULL;
324
325         e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
326         if (!e) {
327                 if (!(gfp_mask & __GFP_NOWARN))
328                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
329                 return NULL;
330         }
331
332         page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
333         if (!page)
334                 goto fail;
335
336         drbd_clear_interval(&e->i);
337         e->epoch = NULL;
338         e->mdev = mdev;
339         e->pages = page;
340         atomic_set(&e->pending_bios, 0);
341         e->i.size = data_size;
342         e->flags = 0;
343         e->i.sector = sector;
344         /*
345          * The block_id is opaque to the receiver.  It is not endianness
346          * converted, and sent back to the sender unchanged.
347          */
348         e->block_id = id;
349
350         return e;
351
352  fail:
353         mempool_free(e, drbd_ee_mempool);
354         return NULL;
355 }
356
357 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
358 {
359         if (e->flags & EE_HAS_DIGEST)
360                 kfree(e->digest);
361         drbd_pp_free(mdev, e->pages, is_net);
362         D_ASSERT(atomic_read(&e->pending_bios) == 0);
363         D_ASSERT(drbd_interval_empty(&e->i));
364         mempool_free(e, drbd_ee_mempool);
365 }
366
367 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
368 {
369         LIST_HEAD(work_list);
370         struct drbd_epoch_entry *e, *t;
371         int count = 0;
372         int is_net = list == &mdev->net_ee;
373
374         spin_lock_irq(&mdev->req_lock);
375         list_splice_init(list, &work_list);
376         spin_unlock_irq(&mdev->req_lock);
377
378         list_for_each_entry_safe(e, t, &work_list, w.list) {
379                 drbd_free_some_ee(mdev, e, is_net);
380                 count++;
381         }
382         return count;
383 }
384
385
386 /*
387  * This function is called from _asender only_
388  * but see also comments in _req_mod(,BARRIER_ACKED)
389  * and receive_Barrier.
390  *
391  * Move entries from net_ee to done_ee, if ready.
392  * Grab done_ee, call all callbacks, free the entries.
393  * The callbacks typically send out ACKs.
394  */
395 static int drbd_process_done_ee(struct drbd_conf *mdev)
396 {
397         LIST_HEAD(work_list);
398         LIST_HEAD(reclaimed);
399         struct drbd_epoch_entry *e, *t;
400         int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
401
402         spin_lock_irq(&mdev->req_lock);
403         reclaim_net_ee(mdev, &reclaimed);
404         list_splice_init(&mdev->done_ee, &work_list);
405         spin_unlock_irq(&mdev->req_lock);
406
407         list_for_each_entry_safe(e, t, &reclaimed, w.list)
408                 drbd_free_net_ee(mdev, e);
409
410         /* possible callbacks here:
411          * e_end_block, and e_end_resync_block, e_send_discard_ack.
412          * all ignore the last argument.
413          */
414         list_for_each_entry_safe(e, t, &work_list, w.list) {
415                 /* list_del not necessary, next/prev members not touched */
416                 ok = e->w.cb(mdev, &e->w, !ok) && ok;
417                 drbd_free_ee(mdev, e);
418         }
419         wake_up(&mdev->ee_wait);
420
421         return ok;
422 }
423
424 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
425 {
426         DEFINE_WAIT(wait);
427
428         /* avoids spin_lock/unlock
429          * and calling prepare_to_wait in the fast path */
430         while (!list_empty(head)) {
431                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
432                 spin_unlock_irq(&mdev->req_lock);
433                 io_schedule();
434                 finish_wait(&mdev->ee_wait, &wait);
435                 spin_lock_irq(&mdev->req_lock);
436         }
437 }
438
439 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
440 {
441         spin_lock_irq(&mdev->req_lock);
442         _drbd_wait_ee_list_empty(mdev, head);
443         spin_unlock_irq(&mdev->req_lock);
444 }
445
446 /* see also kernel_accept; which is only present since 2.6.18.
447  * also we want to log which part of it failed, exactly */
448 static int drbd_accept(struct drbd_conf *mdev, const char **what,
449                 struct socket *sock, struct socket **newsock)
450 {
451         struct sock *sk = sock->sk;
452         int err = 0;
453
454         *what = "listen";
455         err = sock->ops->listen(sock, 5);
456         if (err < 0)
457                 goto out;
458
459         *what = "sock_create_lite";
460         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
461                                newsock);
462         if (err < 0)
463                 goto out;
464
465         *what = "accept";
466         err = sock->ops->accept(sock, *newsock, 0);
467         if (err < 0) {
468                 sock_release(*newsock);
469                 *newsock = NULL;
470                 goto out;
471         }
472         (*newsock)->ops  = sock->ops;
473
474 out:
475         return err;
476 }
477
478 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
479                     void *buf, size_t size, int flags)
480 {
481         mm_segment_t oldfs;
482         struct kvec iov = {
483                 .iov_base = buf,
484                 .iov_len = size,
485         };
486         struct msghdr msg = {
487                 .msg_iovlen = 1,
488                 .msg_iov = (struct iovec *)&iov,
489                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
490         };
491         int rv;
492
493         oldfs = get_fs();
494         set_fs(KERNEL_DS);
495         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
496         set_fs(oldfs);
497
498         return rv;
499 }
500
501 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
502 {
503         mm_segment_t oldfs;
504         struct kvec iov = {
505                 .iov_base = buf,
506                 .iov_len = size,
507         };
508         struct msghdr msg = {
509                 .msg_iovlen = 1,
510                 .msg_iov = (struct iovec *)&iov,
511                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
512         };
513         int rv;
514
515         oldfs = get_fs();
516         set_fs(KERNEL_DS);
517
518         for (;;) {
519                 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
520                 if (rv == size)
521                         break;
522
523                 /* Note:
524                  * ECONNRESET   other side closed the connection
525                  * ERESTARTSYS  (on  sock) we got a signal
526                  */
527
528                 if (rv < 0) {
529                         if (rv == -ECONNRESET)
530                                 dev_info(DEV, "sock was reset by peer\n");
531                         else if (rv != -ERESTARTSYS)
532                                 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
533                         break;
534                 } else if (rv == 0) {
535                         dev_info(DEV, "sock was shut down by peer\n");
536                         break;
537                 } else  {
538                         /* signal came in, or peer/link went down,
539                          * after we read a partial message
540                          */
541                         /* D_ASSERT(signal_pending(current)); */
542                         break;
543                 }
544         };
545
546         set_fs(oldfs);
547
548         if (rv != size)
549                 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
550
551         return rv;
552 }
553
554 /* quoting tcp(7):
555  *   On individual connections, the socket buffer size must be set prior to the
556  *   listen(2) or connect(2) calls in order to have it take effect.
557  * This is our wrapper to do so.
558  */
559 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
560                 unsigned int rcv)
561 {
562         /* open coded SO_SNDBUF, SO_RCVBUF */
563         if (snd) {
564                 sock->sk->sk_sndbuf = snd;
565                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
566         }
567         if (rcv) {
568                 sock->sk->sk_rcvbuf = rcv;
569                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
570         }
571 }
572
573 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
574 {
575         const char *what;
576         struct socket *sock;
577         struct sockaddr_in6 src_in6;
578         int err;
579         int disconnect_on_error = 1;
580
581         if (!get_net_conf(mdev))
582                 return NULL;
583
584         what = "sock_create_kern";
585         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
586                 SOCK_STREAM, IPPROTO_TCP, &sock);
587         if (err < 0) {
588                 sock = NULL;
589                 goto out;
590         }
591
592         sock->sk->sk_rcvtimeo =
593         sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
594         drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
595                         mdev->net_conf->rcvbuf_size);
596
597        /* explicitly bind to the configured IP as source IP
598         *  for the outgoing connections.
599         *  This is needed for multihomed hosts and to be
600         *  able to use lo: interfaces for drbd.
601         * Make sure to use 0 as port number, so linux selects
602         *  a free one dynamically.
603         */
604         memcpy(&src_in6, mdev->net_conf->my_addr,
605                min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
606         if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
607                 src_in6.sin6_port = 0;
608         else
609                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
610
611         what = "bind before connect";
612         err = sock->ops->bind(sock,
613                               (struct sockaddr *) &src_in6,
614                               mdev->net_conf->my_addr_len);
615         if (err < 0)
616                 goto out;
617
618         /* connect may fail, peer not yet available.
619          * stay C_WF_CONNECTION, don't go Disconnecting! */
620         disconnect_on_error = 0;
621         what = "connect";
622         err = sock->ops->connect(sock,
623                                  (struct sockaddr *)mdev->net_conf->peer_addr,
624                                  mdev->net_conf->peer_addr_len, 0);
625
626 out:
627         if (err < 0) {
628                 if (sock) {
629                         sock_release(sock);
630                         sock = NULL;
631                 }
632                 switch (-err) {
633                         /* timeout, busy, signal pending */
634                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
635                 case EINTR: case ERESTARTSYS:
636                         /* peer not (yet) available, network problem */
637                 case ECONNREFUSED: case ENETUNREACH:
638                 case EHOSTDOWN:    case EHOSTUNREACH:
639                         disconnect_on_error = 0;
640                         break;
641                 default:
642                         dev_err(DEV, "%s failed, err = %d\n", what, err);
643                 }
644                 if (disconnect_on_error)
645                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
646         }
647         put_net_conf(mdev);
648         return sock;
649 }
650
651 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
652 {
653         int timeo, err;
654         struct socket *s_estab = NULL, *s_listen;
655         const char *what;
656
657         if (!get_net_conf(mdev))
658                 return NULL;
659
660         what = "sock_create_kern";
661         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
662                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
663         if (err) {
664                 s_listen = NULL;
665                 goto out;
666         }
667
668         timeo = mdev->net_conf->try_connect_int * HZ;
669         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
670
671         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
672         s_listen->sk->sk_rcvtimeo = timeo;
673         s_listen->sk->sk_sndtimeo = timeo;
674         drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
675                         mdev->net_conf->rcvbuf_size);
676
677         what = "bind before listen";
678         err = s_listen->ops->bind(s_listen,
679                               (struct sockaddr *) mdev->net_conf->my_addr,
680                               mdev->net_conf->my_addr_len);
681         if (err < 0)
682                 goto out;
683
684         err = drbd_accept(mdev, &what, s_listen, &s_estab);
685
686 out:
687         if (s_listen)
688                 sock_release(s_listen);
689         if (err < 0) {
690                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
691                         dev_err(DEV, "%s failed, err = %d\n", what, err);
692                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
693                 }
694         }
695         put_net_conf(mdev);
696
697         return s_estab;
698 }
699
700 static int drbd_send_fp(struct drbd_conf *mdev,
701         struct socket *sock, enum drbd_packets cmd)
702 {
703         struct p_header80 *h = &mdev->data.sbuf.header.h80;
704
705         return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
706 }
707
708 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
709 {
710         struct p_header80 *h = &mdev->data.rbuf.header.h80;
711         int rr;
712
713         rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
714
715         if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
716                 return be16_to_cpu(h->command);
717
718         return 0xffff;
719 }
720
721 /**
722  * drbd_socket_okay() - Free the socket if its connection is not okay
723  * @mdev:       DRBD device.
724  * @sock:       pointer to the pointer to the socket.
725  */
726 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
727 {
728         int rr;
729         char tb[4];
730
731         if (!*sock)
732                 return false;
733
734         rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
735
736         if (rr > 0 || rr == -EAGAIN) {
737                 return true;
738         } else {
739                 sock_release(*sock);
740                 *sock = NULL;
741                 return false;
742         }
743 }
744
745 /*
746  * return values:
747  *   1 yes, we have a valid connection
748  *   0 oops, did not work out, please try again
749  *  -1 peer talks different language,
750  *     no point in trying again, please go standalone.
751  *  -2 We do not have a network config...
752  */
753 static int drbd_connect(struct drbd_conf *mdev)
754 {
755         struct socket *s, *sock, *msock;
756         int try, h, ok;
757
758         D_ASSERT(!mdev->data.socket);
759
760         if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
761                 return -2;
762
763         clear_bit(DISCARD_CONCURRENT, &mdev->flags);
764
765         sock  = NULL;
766         msock = NULL;
767
768         do {
769                 for (try = 0;;) {
770                         /* 3 tries, this should take less than a second! */
771                         s = drbd_try_connect(mdev);
772                         if (s || ++try >= 3)
773                                 break;
774                         /* give the other side time to call bind() & listen() */
775                         schedule_timeout_interruptible(HZ / 10);
776                 }
777
778                 if (s) {
779                         if (!sock) {
780                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
781                                 sock = s;
782                                 s = NULL;
783                         } else if (!msock) {
784                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
785                                 msock = s;
786                                 s = NULL;
787                         } else {
788                                 dev_err(DEV, "Logic error in drbd_connect()\n");
789                                 goto out_release_sockets;
790                         }
791                 }
792
793                 if (sock && msock) {
794                         schedule_timeout_interruptible(mdev->net_conf->ping_timeo*HZ/10);
795                         ok = drbd_socket_okay(mdev, &sock);
796                         ok = drbd_socket_okay(mdev, &msock) && ok;
797                         if (ok)
798                                 break;
799                 }
800
801 retry:
802                 s = drbd_wait_for_connect(mdev);
803                 if (s) {
804                         try = drbd_recv_fp(mdev, s);
805                         drbd_socket_okay(mdev, &sock);
806                         drbd_socket_okay(mdev, &msock);
807                         switch (try) {
808                         case P_HAND_SHAKE_S:
809                                 if (sock) {
810                                         dev_warn(DEV, "initial packet S crossed\n");
811                                         sock_release(sock);
812                                 }
813                                 sock = s;
814                                 break;
815                         case P_HAND_SHAKE_M:
816                                 if (msock) {
817                                         dev_warn(DEV, "initial packet M crossed\n");
818                                         sock_release(msock);
819                                 }
820                                 msock = s;
821                                 set_bit(DISCARD_CONCURRENT, &mdev->flags);
822                                 break;
823                         default:
824                                 dev_warn(DEV, "Error receiving initial packet\n");
825                                 sock_release(s);
826                                 if (random32() & 1)
827                                         goto retry;
828                         }
829                 }
830
831                 if (mdev->state.conn <= C_DISCONNECTING)
832                         goto out_release_sockets;
833                 if (signal_pending(current)) {
834                         flush_signals(current);
835                         smp_rmb();
836                         if (get_t_state(&mdev->receiver) == Exiting)
837                                 goto out_release_sockets;
838                 }
839
840                 if (sock && msock) {
841                         ok = drbd_socket_okay(mdev, &sock);
842                         ok = drbd_socket_okay(mdev, &msock) && ok;
843                         if (ok)
844                                 break;
845                 }
846         } while (1);
847
848         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
849         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
850
851         sock->sk->sk_allocation = GFP_NOIO;
852         msock->sk->sk_allocation = GFP_NOIO;
853
854         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
855         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
856
857         /* NOT YET ...
858          * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
859          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
860          * first set it to the P_HAND_SHAKE timeout,
861          * which we set to 4x the configured ping_timeout. */
862         sock->sk->sk_sndtimeo =
863         sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
864
865         msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
866         msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
867
868         /* we don't want delays.
869          * we use TCP_CORK where appropriate, though */
870         drbd_tcp_nodelay(sock);
871         drbd_tcp_nodelay(msock);
872
873         mdev->data.socket = sock;
874         mdev->meta.socket = msock;
875         mdev->last_received = jiffies;
876
877         D_ASSERT(mdev->asender.task == NULL);
878
879         h = drbd_do_handshake(mdev);
880         if (h <= 0)
881                 return h;
882
883         if (mdev->cram_hmac_tfm) {
884                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
885                 switch (drbd_do_auth(mdev)) {
886                 case -1:
887                         dev_err(DEV, "Authentication of peer failed\n");
888                         return -1;
889                 case 0:
890                         dev_err(DEV, "Authentication of peer failed, trying again.\n");
891                         return 0;
892                 }
893         }
894
895         if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
896                 return 0;
897
898         sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
899         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
900
901         atomic_set(&mdev->packet_seq, 0);
902         mdev->peer_seq = 0;
903
904         drbd_thread_start(&mdev->asender);
905
906         if (drbd_send_protocol(mdev) == -1)
907                 return -1;
908         drbd_send_sync_param(mdev, &mdev->sync_conf);
909         drbd_send_sizes(mdev, 0, 0);
910         drbd_send_uuids(mdev);
911         drbd_send_state(mdev);
912         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
913         clear_bit(RESIZE_PENDING, &mdev->flags);
914         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
915
916         return 1;
917
918 out_release_sockets:
919         if (sock)
920                 sock_release(sock);
921         if (msock)
922                 sock_release(msock);
923         return -1;
924 }
925
926 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
927 {
928         union p_header *h = &mdev->data.rbuf.header;
929         int r;
930
931         r = drbd_recv(mdev, h, sizeof(*h));
932         if (unlikely(r != sizeof(*h))) {
933                 if (!signal_pending(current))
934                         dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
935                 return false;
936         }
937
938         if (likely(h->h80.magic == cpu_to_be32(DRBD_MAGIC))) {
939                 *cmd = be16_to_cpu(h->h80.command);
940                 *packet_size = be16_to_cpu(h->h80.length);
941         } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
942                 *cmd = be16_to_cpu(h->h95.command);
943                 *packet_size = be32_to_cpu(h->h95.length);
944         } else {
945                 dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
946                     be32_to_cpu(h->h80.magic),
947                     be16_to_cpu(h->h80.command),
948                     be16_to_cpu(h->h80.length));
949                 return false;
950         }
951         mdev->last_received = jiffies;
952
953         return true;
954 }
955
956 static void drbd_flush(struct drbd_conf *mdev)
957 {
958         int rv;
959
960         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
961                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
962                                         NULL);
963                 if (rv) {
964                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
965                         /* would rather check on EOPNOTSUPP, but that is not reliable.
966                          * don't try again for ANY return value != 0
967                          * if (rv == -EOPNOTSUPP) */
968                         drbd_bump_write_ordering(mdev, WO_drain_io);
969                 }
970                 put_ldev(mdev);
971         }
972 }
973
974 /**
975  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
976  * @mdev:       DRBD device.
977  * @epoch:      Epoch object.
978  * @ev:         Epoch event.
979  */
980 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
981                                                struct drbd_epoch *epoch,
982                                                enum epoch_event ev)
983 {
984         int epoch_size;
985         struct drbd_epoch *next_epoch;
986         enum finish_epoch rv = FE_STILL_LIVE;
987
988         spin_lock(&mdev->epoch_lock);
989         do {
990                 next_epoch = NULL;
991
992                 epoch_size = atomic_read(&epoch->epoch_size);
993
994                 switch (ev & ~EV_CLEANUP) {
995                 case EV_PUT:
996                         atomic_dec(&epoch->active);
997                         break;
998                 case EV_GOT_BARRIER_NR:
999                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1000                         break;
1001                 case EV_BECAME_LAST:
1002                         /* nothing to do*/
1003                         break;
1004                 }
1005
1006                 if (epoch_size != 0 &&
1007                     atomic_read(&epoch->active) == 0 &&
1008                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1009                         if (!(ev & EV_CLEANUP)) {
1010                                 spin_unlock(&mdev->epoch_lock);
1011                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1012                                 spin_lock(&mdev->epoch_lock);
1013                         }
1014                         dec_unacked(mdev);
1015
1016                         if (mdev->current_epoch != epoch) {
1017                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1018                                 list_del(&epoch->list);
1019                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1020                                 mdev->epochs--;
1021                                 kfree(epoch);
1022
1023                                 if (rv == FE_STILL_LIVE)
1024                                         rv = FE_DESTROYED;
1025                         } else {
1026                                 epoch->flags = 0;
1027                                 atomic_set(&epoch->epoch_size, 0);
1028                                 /* atomic_set(&epoch->active, 0); is already zero */
1029                                 if (rv == FE_STILL_LIVE)
1030                                         rv = FE_RECYCLED;
1031                                 wake_up(&mdev->ee_wait);
1032                         }
1033                 }
1034
1035                 if (!next_epoch)
1036                         break;
1037
1038                 epoch = next_epoch;
1039         } while (1);
1040
1041         spin_unlock(&mdev->epoch_lock);
1042
1043         return rv;
1044 }
1045
1046 /**
1047  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1048  * @mdev:       DRBD device.
1049  * @wo:         Write ordering method to try.
1050  */
1051 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1052 {
1053         enum write_ordering_e pwo;
1054         static char *write_ordering_str[] = {
1055                 [WO_none] = "none",
1056                 [WO_drain_io] = "drain",
1057                 [WO_bdev_flush] = "flush",
1058         };
1059
1060         pwo = mdev->write_ordering;
1061         wo = min(pwo, wo);
1062         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1063                 wo = WO_drain_io;
1064         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1065                 wo = WO_none;
1066         mdev->write_ordering = wo;
1067         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1068                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1069 }
1070
1071 /**
1072  * drbd_submit_ee()
1073  * @mdev:       DRBD device.
1074  * @e:          epoch entry
1075  * @rw:         flag field, see bio->bi_rw
1076  *
1077  * May spread the pages to multiple bios,
1078  * depending on bio_add_page restrictions.
1079  *
1080  * Returns 0 if all bios have been submitted,
1081  * -ENOMEM if we could not allocate enough bios,
1082  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1083  *  single page to an empty bio (which should never happen and likely indicates
1084  *  that the lower level IO stack is in some way broken). This has been observed
1085  *  on certain Xen deployments.
1086  */
1087 /* TODO allocate from our own bio_set. */
1088 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1089                 const unsigned rw, const int fault_type)
1090 {
1091         struct bio *bios = NULL;
1092         struct bio *bio;
1093         struct page *page = e->pages;
1094         sector_t sector = e->i.sector;
1095         unsigned ds = e->i.size;
1096         unsigned n_bios = 0;
1097         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1098         int err = -ENOMEM;
1099
1100         /* In most cases, we will only need one bio.  But in case the lower
1101          * level restrictions happen to be different at this offset on this
1102          * side than those of the sending peer, we may need to submit the
1103          * request in more than one bio. */
1104 next_bio:
1105         bio = bio_alloc(GFP_NOIO, nr_pages);
1106         if (!bio) {
1107                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1108                 goto fail;
1109         }
1110         /* > e->i.sector, unless this is the first bio */
1111         bio->bi_sector = sector;
1112         bio->bi_bdev = mdev->ldev->backing_bdev;
1113         bio->bi_rw = rw;
1114         bio->bi_private = e;
1115         bio->bi_end_io = drbd_endio_sec;
1116
1117         bio->bi_next = bios;
1118         bios = bio;
1119         ++n_bios;
1120
1121         page_chain_for_each(page) {
1122                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1123                 if (!bio_add_page(bio, page, len, 0)) {
1124                         /* A single page must always be possible!
1125                          * But in case it fails anyways,
1126                          * we deal with it, and complain (below). */
1127                         if (bio->bi_vcnt == 0) {
1128                                 dev_err(DEV,
1129                                         "bio_add_page failed for len=%u, "
1130                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1131                                         len, (unsigned long long)bio->bi_sector);
1132                                 err = -ENOSPC;
1133                                 goto fail;
1134                         }
1135                         goto next_bio;
1136                 }
1137                 ds -= len;
1138                 sector += len >> 9;
1139                 --nr_pages;
1140         }
1141         D_ASSERT(page == NULL);
1142         D_ASSERT(ds == 0);
1143
1144         atomic_set(&e->pending_bios, n_bios);
1145         do {
1146                 bio = bios;
1147                 bios = bios->bi_next;
1148                 bio->bi_next = NULL;
1149
1150                 drbd_generic_make_request(mdev, fault_type, bio);
1151         } while (bios);
1152         return 0;
1153
1154 fail:
1155         while (bios) {
1156                 bio = bios;
1157                 bios = bios->bi_next;
1158                 bio_put(bio);
1159         }
1160         return err;
1161 }
1162
1163 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1164 {
1165         int rv;
1166         struct p_barrier *p = &mdev->data.rbuf.barrier;
1167         struct drbd_epoch *epoch;
1168
1169         inc_unacked(mdev);
1170
1171         mdev->current_epoch->barrier_nr = p->barrier;
1172         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1173
1174         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1175          * the activity log, which means it would not be resynced in case the
1176          * R_PRIMARY crashes now.
1177          * Therefore we must send the barrier_ack after the barrier request was
1178          * completed. */
1179         switch (mdev->write_ordering) {
1180         case WO_none:
1181                 if (rv == FE_RECYCLED)
1182                         return true;
1183
1184                 /* receiver context, in the writeout path of the other node.
1185                  * avoid potential distributed deadlock */
1186                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1187                 if (epoch)
1188                         break;
1189                 else
1190                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1191                         /* Fall through */
1192
1193         case WO_bdev_flush:
1194         case WO_drain_io:
1195                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1196                 drbd_flush(mdev);
1197
1198                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1199                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1200                         if (epoch)
1201                                 break;
1202                 }
1203
1204                 epoch = mdev->current_epoch;
1205                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1206
1207                 D_ASSERT(atomic_read(&epoch->active) == 0);
1208                 D_ASSERT(epoch->flags == 0);
1209
1210                 return true;
1211         default:
1212                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1213                 return false;
1214         }
1215
1216         epoch->flags = 0;
1217         atomic_set(&epoch->epoch_size, 0);
1218         atomic_set(&epoch->active, 0);
1219
1220         spin_lock(&mdev->epoch_lock);
1221         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1222                 list_add(&epoch->list, &mdev->current_epoch->list);
1223                 mdev->current_epoch = epoch;
1224                 mdev->epochs++;
1225         } else {
1226                 /* The current_epoch got recycled while we allocated this one... */
1227                 kfree(epoch);
1228         }
1229         spin_unlock(&mdev->epoch_lock);
1230
1231         return true;
1232 }
1233
1234 /* used from receive_RSDataReply (recv_resync_read)
1235  * and from receive_Data */
1236 static struct drbd_epoch_entry *
1237 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1238 {
1239         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1240         struct drbd_epoch_entry *e;
1241         struct page *page;
1242         int dgs, ds, rr;
1243         void *dig_in = mdev->int_dig_in;
1244         void *dig_vv = mdev->int_dig_vv;
1245         unsigned long *data;
1246
1247         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1248                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1249
1250         if (dgs) {
1251                 rr = drbd_recv(mdev, dig_in, dgs);
1252                 if (rr != dgs) {
1253                         if (!signal_pending(current))
1254                                 dev_warn(DEV,
1255                                         "short read receiving data digest: read %d expected %d\n",
1256                                         rr, dgs);
1257                         return NULL;
1258                 }
1259         }
1260
1261         data_size -= dgs;
1262
1263         ERR_IF(data_size == 0) return NULL;
1264         ERR_IF(data_size &  0x1ff) return NULL;
1265         ERR_IF(data_size >  DRBD_MAX_BIO_SIZE) return NULL;
1266
1267         /* even though we trust out peer,
1268          * we sometimes have to double check. */
1269         if (sector + (data_size>>9) > capacity) {
1270                 dev_err(DEV, "request from peer beyond end of local disk: "
1271                         "capacity: %llus < sector: %llus + size: %u\n",
1272                         (unsigned long long)capacity,
1273                         (unsigned long long)sector, data_size);
1274                 return NULL;
1275         }
1276
1277         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1278          * "criss-cross" setup, that might cause write-out on some other DRBD,
1279          * which in turn might block on the other node at this very place.  */
1280         e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1281         if (!e)
1282                 return NULL;
1283
1284         ds = data_size;
1285         page = e->pages;
1286         page_chain_for_each(page) {
1287                 unsigned len = min_t(int, ds, PAGE_SIZE);
1288                 data = kmap(page);
1289                 rr = drbd_recv(mdev, data, len);
1290                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1291                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1292                         data[0] = data[0] ^ (unsigned long)-1;
1293                 }
1294                 kunmap(page);
1295                 if (rr != len) {
1296                         drbd_free_ee(mdev, e);
1297                         if (!signal_pending(current))
1298                                 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1299                                 rr, len);
1300                         return NULL;
1301                 }
1302                 ds -= rr;
1303         }
1304
1305         if (dgs) {
1306                 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1307                 if (memcmp(dig_in, dig_vv, dgs)) {
1308                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1309                                 (unsigned long long)sector, data_size);
1310                         drbd_bcast_ee(mdev, "digest failed",
1311                                         dgs, dig_in, dig_vv, e);
1312                         drbd_free_ee(mdev, e);
1313                         return NULL;
1314                 }
1315         }
1316         mdev->recv_cnt += data_size>>9;
1317         return e;
1318 }
1319
1320 /* drbd_drain_block() just takes a data block
1321  * out of the socket input buffer, and discards it.
1322  */
1323 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1324 {
1325         struct page *page;
1326         int rr, rv = 1;
1327         void *data;
1328
1329         if (!data_size)
1330                 return true;
1331
1332         page = drbd_pp_alloc(mdev, 1, 1);
1333
1334         data = kmap(page);
1335         while (data_size) {
1336                 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1337                 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1338                         rv = 0;
1339                         if (!signal_pending(current))
1340                                 dev_warn(DEV,
1341                                         "short read receiving data: read %d expected %d\n",
1342                                         rr, min_t(int, data_size, PAGE_SIZE));
1343                         break;
1344                 }
1345                 data_size -= rr;
1346         }
1347         kunmap(page);
1348         drbd_pp_free(mdev, page, 0);
1349         return rv;
1350 }
1351
1352 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1353                            sector_t sector, int data_size)
1354 {
1355         struct bio_vec *bvec;
1356         struct bio *bio;
1357         int dgs, rr, i, expect;
1358         void *dig_in = mdev->int_dig_in;
1359         void *dig_vv = mdev->int_dig_vv;
1360
1361         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1362                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1363
1364         if (dgs) {
1365                 rr = drbd_recv(mdev, dig_in, dgs);
1366                 if (rr != dgs) {
1367                         if (!signal_pending(current))
1368                                 dev_warn(DEV,
1369                                         "short read receiving data reply digest: read %d expected %d\n",
1370                                         rr, dgs);
1371                         return 0;
1372                 }
1373         }
1374
1375         data_size -= dgs;
1376
1377         /* optimistically update recv_cnt.  if receiving fails below,
1378          * we disconnect anyways, and counters will be reset. */
1379         mdev->recv_cnt += data_size>>9;
1380
1381         bio = req->master_bio;
1382         D_ASSERT(sector == bio->bi_sector);
1383
1384         bio_for_each_segment(bvec, bio, i) {
1385                 expect = min_t(int, data_size, bvec->bv_len);
1386                 rr = drbd_recv(mdev,
1387                              kmap(bvec->bv_page)+bvec->bv_offset,
1388                              expect);
1389                 kunmap(bvec->bv_page);
1390                 if (rr != expect) {
1391                         if (!signal_pending(current))
1392                                 dev_warn(DEV, "short read receiving data reply: "
1393                                         "read %d expected %d\n",
1394                                         rr, expect);
1395                         return 0;
1396                 }
1397                 data_size -= rr;
1398         }
1399
1400         if (dgs) {
1401                 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1402                 if (memcmp(dig_in, dig_vv, dgs)) {
1403                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1404                         return 0;
1405                 }
1406         }
1407
1408         D_ASSERT(data_size == 0);
1409         return 1;
1410 }
1411
1412 /* e_end_resync_block() is called via
1413  * drbd_process_done_ee() by asender only */
1414 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1415 {
1416         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1417         sector_t sector = e->i.sector;
1418         int ok;
1419
1420         D_ASSERT(drbd_interval_empty(&e->i));
1421
1422         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1423                 drbd_set_in_sync(mdev, sector, e->i.size);
1424                 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1425         } else {
1426                 /* Record failure to sync */
1427                 drbd_rs_failed_io(mdev, sector, e->i.size);
1428
1429                 ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1430         }
1431         dec_unacked(mdev);
1432
1433         return ok;
1434 }
1435
1436 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1437 {
1438         struct drbd_epoch_entry *e;
1439
1440         e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1441         if (!e)
1442                 goto fail;
1443
1444         dec_rs_pending(mdev);
1445
1446         inc_unacked(mdev);
1447         /* corresponding dec_unacked() in e_end_resync_block()
1448          * respective _drbd_clear_done_ee */
1449
1450         e->w.cb = e_end_resync_block;
1451
1452         spin_lock_irq(&mdev->req_lock);
1453         list_add(&e->w.list, &mdev->sync_ee);
1454         spin_unlock_irq(&mdev->req_lock);
1455
1456         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1457         if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1458                 return true;
1459
1460         /* don't care for the reason here */
1461         dev_err(DEV, "submit failed, triggering re-connect\n");
1462         spin_lock_irq(&mdev->req_lock);
1463         list_del(&e->w.list);
1464         spin_unlock_irq(&mdev->req_lock);
1465
1466         drbd_free_ee(mdev, e);
1467 fail:
1468         put_ldev(mdev);
1469         return false;
1470 }
1471
1472 static struct drbd_request *
1473 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1474              sector_t sector, bool missing_ok, const char *func)
1475 {
1476         struct drbd_request *req;
1477
1478         /* Request object according to our peer */
1479         req = (struct drbd_request *)(unsigned long)id;
1480         if (drbd_contains_interval(root, sector, &req->i))
1481                 return req;
1482         if (!missing_ok) {
1483                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1484                         (unsigned long)id, (unsigned long long)sector);
1485         }
1486         return NULL;
1487 }
1488
1489 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1490 {
1491         struct drbd_request *req;
1492         sector_t sector;
1493         int ok;
1494         struct p_data *p = &mdev->data.rbuf.data;
1495
1496         sector = be64_to_cpu(p->sector);
1497
1498         spin_lock_irq(&mdev->req_lock);
1499         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1500         spin_unlock_irq(&mdev->req_lock);
1501         if (unlikely(!req))
1502                 return false;
1503
1504         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1505          * special casing it there for the various failure cases.
1506          * still no race with drbd_fail_pending_reads */
1507         ok = recv_dless_read(mdev, req, sector, data_size);
1508
1509         if (ok)
1510                 req_mod(req, DATA_RECEIVED);
1511         /* else: nothing. handled from drbd_disconnect...
1512          * I don't think we may complete this just yet
1513          * in case we are "on-disconnect: freeze" */
1514
1515         return ok;
1516 }
1517
1518 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1519 {
1520         sector_t sector;
1521         int ok;
1522         struct p_data *p = &mdev->data.rbuf.data;
1523
1524         sector = be64_to_cpu(p->sector);
1525         D_ASSERT(p->block_id == ID_SYNCER);
1526
1527         if (get_ldev(mdev)) {
1528                 /* data is submitted to disk within recv_resync_read.
1529                  * corresponding put_ldev done below on error,
1530                  * or in drbd_endio_sec. */
1531                 ok = recv_resync_read(mdev, sector, data_size);
1532         } else {
1533                 if (__ratelimit(&drbd_ratelimit_state))
1534                         dev_err(DEV, "Can not write resync data to local disk.\n");
1535
1536                 ok = drbd_drain_block(mdev, data_size);
1537
1538                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1539         }
1540
1541         atomic_add(data_size >> 9, &mdev->rs_sect_in);
1542
1543         return ok;
1544 }
1545
1546 /* e_end_block() is called via drbd_process_done_ee().
1547  * this means this function only runs in the asender thread
1548  */
1549 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1550 {
1551         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1552         sector_t sector = e->i.sector;
1553         int ok = 1, pcmd;
1554
1555         if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1556                 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1557                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1558                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1559                                 e->flags & EE_MAY_SET_IN_SYNC) ?
1560                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1561                         ok &= drbd_send_ack(mdev, pcmd, e);
1562                         if (pcmd == P_RS_WRITE_ACK)
1563                                 drbd_set_in_sync(mdev, sector, e->i.size);
1564                 } else {
1565                         ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1566                         /* we expect it to be marked out of sync anyways...
1567                          * maybe assert this?  */
1568                 }
1569                 dec_unacked(mdev);
1570         }
1571         /* we delete from the conflict detection hash _after_ we sent out the
1572          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1573         if (mdev->net_conf->two_primaries) {
1574                 spin_lock_irq(&mdev->req_lock);
1575                 D_ASSERT(!drbd_interval_empty(&e->i));
1576                 drbd_remove_interval(&mdev->epoch_entries, &e->i);
1577                 drbd_clear_interval(&e->i);
1578                 spin_unlock_irq(&mdev->req_lock);
1579         } else
1580                 D_ASSERT(drbd_interval_empty(&e->i));
1581
1582         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1583
1584         return ok;
1585 }
1586
1587 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1588 {
1589         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1590         int ok = 1;
1591
1592         D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1593         ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1594
1595         spin_lock_irq(&mdev->req_lock);
1596         D_ASSERT(!drbd_interval_empty(&e->i));
1597         drbd_remove_interval(&mdev->epoch_entries, &e->i);
1598         drbd_clear_interval(&e->i);
1599         spin_unlock_irq(&mdev->req_lock);
1600
1601         dec_unacked(mdev);
1602
1603         return ok;
1604 }
1605
1606 /* Called from receive_Data.
1607  * Synchronize packets on sock with packets on msock.
1608  *
1609  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1610  * packet traveling on msock, they are still processed in the order they have
1611  * been sent.
1612  *
1613  * Note: we don't care for Ack packets overtaking P_DATA packets.
1614  *
1615  * In case packet_seq is larger than mdev->peer_seq number, there are
1616  * outstanding packets on the msock. We wait for them to arrive.
1617  * In case we are the logically next packet, we update mdev->peer_seq
1618  * ourselves. Correctly handles 32bit wrap around.
1619  *
1620  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1621  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1622  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1623  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1624  *
1625  * returns 0 if we may process the packet,
1626  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1627 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1628 {
1629         DEFINE_WAIT(wait);
1630         unsigned int p_seq;
1631         long timeout;
1632         int ret = 0;
1633         spin_lock(&mdev->peer_seq_lock);
1634         for (;;) {
1635                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1636                 if (seq_le(packet_seq, mdev->peer_seq+1))
1637                         break;
1638                 if (signal_pending(current)) {
1639                         ret = -ERESTARTSYS;
1640                         break;
1641                 }
1642                 p_seq = mdev->peer_seq;
1643                 spin_unlock(&mdev->peer_seq_lock);
1644                 timeout = schedule_timeout(30*HZ);
1645                 spin_lock(&mdev->peer_seq_lock);
1646                 if (timeout == 0 && p_seq == mdev->peer_seq) {
1647                         ret = -ETIMEDOUT;
1648                         dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1649                         break;
1650                 }
1651         }
1652         finish_wait(&mdev->seq_wait, &wait);
1653         if (mdev->peer_seq+1 == packet_seq)
1654                 mdev->peer_seq++;
1655         spin_unlock(&mdev->peer_seq_lock);
1656         return ret;
1657 }
1658
1659 /* see also bio_flags_to_wire()
1660  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1661  * flags and back. We may replicate to other kernel versions. */
1662 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1663 {
1664         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1665                 (dpf & DP_FUA ? REQ_FUA : 0) |
1666                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1667                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1668 }
1669
1670 /* mirrored write */
1671 static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1672 {
1673         sector_t sector;
1674         struct drbd_epoch_entry *e;
1675         struct p_data *p = &mdev->data.rbuf.data;
1676         int rw = WRITE;
1677         u32 dp_flags;
1678
1679         if (!get_ldev(mdev)) {
1680                 spin_lock(&mdev->peer_seq_lock);
1681                 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1682                         mdev->peer_seq++;
1683                 spin_unlock(&mdev->peer_seq_lock);
1684
1685                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1686                 atomic_inc(&mdev->current_epoch->epoch_size);
1687                 return drbd_drain_block(mdev, data_size);
1688         }
1689
1690         /* get_ldev(mdev) successful.
1691          * Corresponding put_ldev done either below (on various errors),
1692          * or in drbd_endio_sec, if we successfully submit the data at
1693          * the end of this function. */
1694
1695         sector = be64_to_cpu(p->sector);
1696         e = read_in_block(mdev, p->block_id, sector, data_size);
1697         if (!e) {
1698                 put_ldev(mdev);
1699                 return false;
1700         }
1701
1702         e->w.cb = e_end_block;
1703
1704         dp_flags = be32_to_cpu(p->dp_flags);
1705         rw |= wire_flags_to_bio(mdev, dp_flags);
1706
1707         if (dp_flags & DP_MAY_SET_IN_SYNC)
1708                 e->flags |= EE_MAY_SET_IN_SYNC;
1709
1710         spin_lock(&mdev->epoch_lock);
1711         e->epoch = mdev->current_epoch;
1712         atomic_inc(&e->epoch->epoch_size);
1713         atomic_inc(&e->epoch->active);
1714         spin_unlock(&mdev->epoch_lock);
1715
1716         /* I'm the receiver, I do hold a net_cnt reference. */
1717         if (!mdev->net_conf->two_primaries) {
1718                 spin_lock_irq(&mdev->req_lock);
1719         } else {
1720                 /* don't get the req_lock yet,
1721                  * we may sleep in drbd_wait_peer_seq */
1722                 const int size = e->i.size;
1723                 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1724                 DEFINE_WAIT(wait);
1725                 int first;
1726
1727                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1728
1729                 /* conflict detection and handling:
1730                  * 1. wait on the sequence number,
1731                  *    in case this data packet overtook ACK packets.
1732                  * 2. check our interval trees for conflicting requests:
1733                  *    we only need to check the write_requests tree; the
1734                  *    epoch_entries tree cannot contain any overlaps because
1735                  *    they were already eliminated on the submitting node.
1736                  *
1737                  * Note: for two_primaries, we are protocol C,
1738                  * so there cannot be any request that is DONE
1739                  * but still on the transfer log.
1740                  *
1741                  * unconditionally add to the epoch_entries tree.
1742                  *
1743                  * if no conflicting request is found:
1744                  *    submit.
1745                  *
1746                  * if any conflicting request is found
1747                  * that has not yet been acked,
1748                  * AND I have the "discard concurrent writes" flag:
1749                  *       queue (via done_ee) the P_DISCARD_ACK; OUT.
1750                  *
1751                  * if any conflicting request is found:
1752                  *       block the receiver, waiting on misc_wait
1753                  *       until no more conflicting requests are there,
1754                  *       or we get interrupted (disconnect).
1755                  *
1756                  *       we do not just write after local io completion of those
1757                  *       requests, but only after req is done completely, i.e.
1758                  *       we wait for the P_DISCARD_ACK to arrive!
1759                  *
1760                  *       then proceed normally, i.e. submit.
1761                  */
1762                 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1763                         goto out_interrupted;
1764
1765                 spin_lock_irq(&mdev->req_lock);
1766
1767                 drbd_insert_interval(&mdev->epoch_entries, &e->i);
1768
1769                 first = 1;
1770                 for (;;) {
1771                         struct drbd_interval *i;
1772                         int have_unacked = 0;
1773                         int have_conflict = 0;
1774                         prepare_to_wait(&mdev->misc_wait, &wait,
1775                                 TASK_INTERRUPTIBLE);
1776
1777                         i = drbd_find_overlap(&mdev->write_requests, sector, size);
1778                         if (i) {
1779                                 struct drbd_request *req2 =
1780                                         container_of(i, struct drbd_request, i);
1781
1782                                 /* only ALERT on first iteration,
1783                                  * we may be woken up early... */
1784                                 if (first)
1785                                         dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1786                                               " new: %llus +%u; pending: %llus +%u\n",
1787                                               current->comm, current->pid,
1788                                               (unsigned long long)sector, size,
1789                                               (unsigned long long)req2->i.sector, req2->i.size);
1790                                 if (req2->rq_state & RQ_NET_PENDING)
1791                                         ++have_unacked;
1792                                 ++have_conflict;
1793                         }
1794                         if (!have_conflict)
1795                                 break;
1796
1797                         /* Discard Ack only for the _first_ iteration */
1798                         if (first && discard && have_unacked) {
1799                                 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1800                                      (unsigned long long)sector);
1801                                 inc_unacked(mdev);
1802                                 e->w.cb = e_send_discard_ack;
1803                                 list_add_tail(&e->w.list, &mdev->done_ee);
1804
1805                                 spin_unlock_irq(&mdev->req_lock);
1806
1807                                 /* we could probably send that P_DISCARD_ACK ourselves,
1808                                  * but I don't like the receiver using the msock */
1809
1810                                 put_ldev(mdev);
1811                                 wake_asender(mdev);
1812                                 finish_wait(&mdev->misc_wait, &wait);
1813                                 return true;
1814                         }
1815
1816                         if (signal_pending(current)) {
1817                                 drbd_remove_interval(&mdev->epoch_entries, &e->i);
1818                                 drbd_clear_interval(&e->i);
1819
1820                                 spin_unlock_irq(&mdev->req_lock);
1821
1822                                 finish_wait(&mdev->misc_wait, &wait);
1823                                 goto out_interrupted;
1824                         }
1825
1826                         spin_unlock_irq(&mdev->req_lock);
1827                         if (first) {
1828                                 first = 0;
1829                                 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1830                                      "sec=%llus\n", (unsigned long long)sector);
1831                         } else if (discard) {
1832                                 /* we had none on the first iteration.
1833                                  * there must be none now. */
1834                                 D_ASSERT(have_unacked == 0);
1835                         }
1836                         schedule();
1837                         spin_lock_irq(&mdev->req_lock);
1838                 }
1839                 finish_wait(&mdev->misc_wait, &wait);
1840         }
1841
1842         list_add(&e->w.list, &mdev->active_ee);
1843         spin_unlock_irq(&mdev->req_lock);
1844
1845         switch (mdev->net_conf->wire_protocol) {
1846         case DRBD_PROT_C:
1847                 inc_unacked(mdev);
1848                 /* corresponding dec_unacked() in e_end_block()
1849                  * respective _drbd_clear_done_ee */
1850                 break;
1851         case DRBD_PROT_B:
1852                 /* I really don't like it that the receiver thread
1853                  * sends on the msock, but anyways */
1854                 drbd_send_ack(mdev, P_RECV_ACK, e);
1855                 break;
1856         case DRBD_PROT_A:
1857                 /* nothing to do */
1858                 break;
1859         }
1860
1861         if (mdev->state.pdsk < D_INCONSISTENT) {
1862                 /* In case we have the only disk of the cluster, */
1863                 drbd_set_out_of_sync(mdev, e->i.sector, e->i.size);
1864                 e->flags |= EE_CALL_AL_COMPLETE_IO;
1865                 e->flags &= ~EE_MAY_SET_IN_SYNC;
1866                 drbd_al_begin_io(mdev, e->i.sector);
1867         }
1868
1869         if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1870                 return true;
1871
1872         /* don't care for the reason here */
1873         dev_err(DEV, "submit failed, triggering re-connect\n");
1874         spin_lock_irq(&mdev->req_lock);
1875         list_del(&e->w.list);
1876         drbd_remove_interval(&mdev->epoch_entries, &e->i);
1877         drbd_clear_interval(&e->i);
1878         spin_unlock_irq(&mdev->req_lock);
1879         if (e->flags & EE_CALL_AL_COMPLETE_IO)
1880                 drbd_al_complete_io(mdev, e->i.sector);
1881
1882 out_interrupted:
1883         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
1884         put_ldev(mdev);
1885         drbd_free_ee(mdev, e);
1886         return false;
1887 }
1888
1889 /* We may throttle resync, if the lower device seems to be busy,
1890  * and current sync rate is above c_min_rate.
1891  *
1892  * To decide whether or not the lower device is busy, we use a scheme similar
1893  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1894  * (more than 64 sectors) of activity we cannot account for with our own resync
1895  * activity, it obviously is "busy".
1896  *
1897  * The current sync rate used here uses only the most recent two step marks,
1898  * to have a short time average so we can react faster.
1899  */
1900 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1901 {
1902         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1903         unsigned long db, dt, dbdt;
1904         struct lc_element *tmp;
1905         int curr_events;
1906         int throttle = 0;
1907
1908         /* feature disabled? */
1909         if (mdev->sync_conf.c_min_rate == 0)
1910                 return 0;
1911
1912         spin_lock_irq(&mdev->al_lock);
1913         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1914         if (tmp) {
1915                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1916                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1917                         spin_unlock_irq(&mdev->al_lock);
1918                         return 0;
1919                 }
1920                 /* Do not slow down if app IO is already waiting for this extent */
1921         }
1922         spin_unlock_irq(&mdev->al_lock);
1923
1924         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1925                       (int)part_stat_read(&disk->part0, sectors[1]) -
1926                         atomic_read(&mdev->rs_sect_ev);
1927
1928         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1929                 unsigned long rs_left;
1930                 int i;
1931
1932                 mdev->rs_last_events = curr_events;
1933
1934                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1935                  * approx. */
1936                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1937
1938                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1939                         rs_left = mdev->ov_left;
1940                 else
1941                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1942
1943                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1944                 if (!dt)
1945                         dt++;
1946                 db = mdev->rs_mark_left[i] - rs_left;
1947                 dbdt = Bit2KB(db/dt);
1948
1949                 if (dbdt > mdev->sync_conf.c_min_rate)
1950                         throttle = 1;
1951         }
1952         return throttle;
1953 }
1954
1955
1956 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1957 {
1958         sector_t sector;
1959         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1960         struct drbd_epoch_entry *e;
1961         struct digest_info *di = NULL;
1962         int size, verb;
1963         unsigned int fault_type;
1964         struct p_block_req *p = &mdev->data.rbuf.block_req;
1965
1966         sector = be64_to_cpu(p->sector);
1967         size   = be32_to_cpu(p->blksize);
1968
1969         if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
1970                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1971                                 (unsigned long long)sector, size);
1972                 return false;
1973         }
1974         if (sector + (size>>9) > capacity) {
1975                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1976                                 (unsigned long long)sector, size);
1977                 return false;
1978         }
1979
1980         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1981                 verb = 1;
1982                 switch (cmd) {
1983                 case P_DATA_REQUEST:
1984                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
1985                         break;
1986                 case P_RS_DATA_REQUEST:
1987                 case P_CSUM_RS_REQUEST:
1988                 case P_OV_REQUEST:
1989                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
1990                         break;
1991                 case P_OV_REPLY:
1992                         verb = 0;
1993                         dec_rs_pending(mdev);
1994                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
1995                         break;
1996                 default:
1997                         dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
1998                                 cmdname(cmd));
1999                 }
2000                 if (verb && __ratelimit(&drbd_ratelimit_state))
2001                         dev_err(DEV, "Can not satisfy peer's read request, "
2002                             "no local data.\n");
2003
2004                 /* drain possibly payload */
2005                 return drbd_drain_block(mdev, digest_size);
2006         }
2007
2008         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2009          * "criss-cross" setup, that might cause write-out on some other DRBD,
2010          * which in turn might block on the other node at this very place.  */
2011         e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2012         if (!e) {
2013                 put_ldev(mdev);
2014                 return false;
2015         }
2016
2017         switch (cmd) {
2018         case P_DATA_REQUEST:
2019                 e->w.cb = w_e_end_data_req;
2020                 fault_type = DRBD_FAULT_DT_RD;
2021                 /* application IO, don't drbd_rs_begin_io */
2022                 goto submit;
2023
2024         case P_RS_DATA_REQUEST:
2025                 e->w.cb = w_e_end_rsdata_req;
2026                 fault_type = DRBD_FAULT_RS_RD;
2027                 /* used in the sector offset progress display */
2028                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2029                 break;
2030
2031         case P_OV_REPLY:
2032         case P_CSUM_RS_REQUEST:
2033                 fault_type = DRBD_FAULT_RS_RD;
2034                 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2035                 if (!di)
2036                         goto out_free_e;
2037
2038                 di->digest_size = digest_size;
2039                 di->digest = (((char *)di)+sizeof(struct digest_info));
2040
2041                 e->digest = di;
2042                 e->flags |= EE_HAS_DIGEST;
2043
2044                 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2045                         goto out_free_e;
2046
2047                 if (cmd == P_CSUM_RS_REQUEST) {
2048                         D_ASSERT(mdev->agreed_pro_version >= 89);
2049                         e->w.cb = w_e_end_csum_rs_req;
2050                         /* used in the sector offset progress display */
2051                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2052                 } else if (cmd == P_OV_REPLY) {
2053                         /* track progress, we may need to throttle */
2054                         atomic_add(size >> 9, &mdev->rs_sect_in);
2055                         e->w.cb = w_e_end_ov_reply;
2056                         dec_rs_pending(mdev);
2057                         /* drbd_rs_begin_io done when we sent this request,
2058                          * but accounting still needs to be done. */
2059                         goto submit_for_resync;
2060                 }
2061                 break;
2062
2063         case P_OV_REQUEST:
2064                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2065                     mdev->agreed_pro_version >= 90) {
2066                         unsigned long now = jiffies;
2067                         int i;
2068                         mdev->ov_start_sector = sector;
2069                         mdev->ov_position = sector;
2070                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2071                         mdev->rs_total = mdev->ov_left;
2072                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2073                                 mdev->rs_mark_left[i] = mdev->ov_left;
2074                                 mdev->rs_mark_time[i] = now;
2075                         }
2076                         dev_info(DEV, "Online Verify start sector: %llu\n",
2077                                         (unsigned long long)sector);
2078                 }
2079                 e->w.cb = w_e_end_ov_req;
2080                 fault_type = DRBD_FAULT_RS_RD;
2081                 break;
2082
2083         default:
2084                 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2085                     cmdname(cmd));
2086                 fault_type = DRBD_FAULT_MAX;
2087                 goto out_free_e;
2088         }
2089
2090         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2091          * wrt the receiver, but it is not as straightforward as it may seem.
2092          * Various places in the resync start and stop logic assume resync
2093          * requests are processed in order, requeuing this on the worker thread
2094          * introduces a bunch of new code for synchronization between threads.
2095          *
2096          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2097          * "forever", throttling after drbd_rs_begin_io will lock that extent
2098          * for application writes for the same time.  For now, just throttle
2099          * here, where the rest of the code expects the receiver to sleep for
2100          * a while, anyways.
2101          */
2102
2103         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2104          * this defers syncer requests for some time, before letting at least
2105          * on request through.  The resync controller on the receiving side
2106          * will adapt to the incoming rate accordingly.
2107          *
2108          * We cannot throttle here if remote is Primary/SyncTarget:
2109          * we would also throttle its application reads.
2110          * In that case, throttling is done on the SyncTarget only.
2111          */
2112         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2113                 schedule_timeout_uninterruptible(HZ/10);
2114         if (drbd_rs_begin_io(mdev, sector))
2115                 goto out_free_e;
2116
2117 submit_for_resync:
2118         atomic_add(size >> 9, &mdev->rs_sect_ev);
2119
2120 submit:
2121         inc_unacked(mdev);
2122         spin_lock_irq(&mdev->req_lock);
2123         list_add_tail(&e->w.list, &mdev->read_ee);
2124         spin_unlock_irq(&mdev->req_lock);
2125
2126         if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2127                 return true;
2128
2129         /* don't care for the reason here */
2130         dev_err(DEV, "submit failed, triggering re-connect\n");
2131         spin_lock_irq(&mdev->req_lock);
2132         list_del(&e->w.list);
2133         spin_unlock_irq(&mdev->req_lock);
2134         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2135
2136 out_free_e:
2137         put_ldev(mdev);
2138         drbd_free_ee(mdev, e);
2139         return false;
2140 }
2141
2142 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2143 {
2144         int self, peer, rv = -100;
2145         unsigned long ch_self, ch_peer;
2146
2147         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2148         peer = mdev->p_uuid[UI_BITMAP] & 1;
2149
2150         ch_peer = mdev->p_uuid[UI_SIZE];
2151         ch_self = mdev->comm_bm_set;
2152
2153         switch (mdev->net_conf->after_sb_0p) {
2154         case ASB_CONSENSUS:
2155         case ASB_DISCARD_SECONDARY:
2156         case ASB_CALL_HELPER:
2157                 dev_err(DEV, "Configuration error.\n");
2158                 break;
2159         case ASB_DISCONNECT:
2160                 break;
2161         case ASB_DISCARD_YOUNGER_PRI:
2162                 if (self == 0 && peer == 1) {
2163                         rv = -1;
2164                         break;
2165                 }
2166                 if (self == 1 && peer == 0) {
2167                         rv =  1;
2168                         break;
2169                 }
2170                 /* Else fall through to one of the other strategies... */
2171         case ASB_DISCARD_OLDER_PRI:
2172                 if (self == 0 && peer == 1) {
2173                         rv = 1;
2174                         break;
2175                 }
2176                 if (self == 1 && peer == 0) {
2177                         rv = -1;
2178                         break;
2179                 }
2180                 /* Else fall through to one of the other strategies... */
2181                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2182                      "Using discard-least-changes instead\n");
2183         case ASB_DISCARD_ZERO_CHG:
2184                 if (ch_peer == 0 && ch_self == 0) {
2185                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2186                                 ? -1 : 1;
2187                         break;
2188                 } else {
2189                         if (ch_peer == 0) { rv =  1; break; }
2190                         if (ch_self == 0) { rv = -1; break; }
2191                 }
2192                 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2193                         break;
2194         case ASB_DISCARD_LEAST_CHG:
2195                 if      (ch_self < ch_peer)
2196                         rv = -1;
2197                 else if (ch_self > ch_peer)
2198                         rv =  1;
2199                 else /* ( ch_self == ch_peer ) */
2200                      /* Well, then use something else. */
2201                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2202                                 ? -1 : 1;
2203                 break;
2204         case ASB_DISCARD_LOCAL:
2205                 rv = -1;
2206                 break;
2207         case ASB_DISCARD_REMOTE:
2208                 rv =  1;
2209         }
2210
2211         return rv;
2212 }
2213
2214 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2215 {
2216         int hg, rv = -100;
2217
2218         switch (mdev->net_conf->after_sb_1p) {
2219         case ASB_DISCARD_YOUNGER_PRI:
2220         case ASB_DISCARD_OLDER_PRI:
2221         case ASB_DISCARD_LEAST_CHG:
2222         case ASB_DISCARD_LOCAL:
2223         case ASB_DISCARD_REMOTE:
2224                 dev_err(DEV, "Configuration error.\n");
2225                 break;
2226         case ASB_DISCONNECT:
2227                 break;
2228         case ASB_CONSENSUS:
2229                 hg = drbd_asb_recover_0p(mdev);
2230                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2231                         rv = hg;
2232                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2233                         rv = hg;
2234                 break;
2235         case ASB_VIOLENTLY:
2236                 rv = drbd_asb_recover_0p(mdev);
2237                 break;
2238         case ASB_DISCARD_SECONDARY:
2239                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2240         case ASB_CALL_HELPER:
2241                 hg = drbd_asb_recover_0p(mdev);
2242                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2243                         enum drbd_state_rv rv2;
2244
2245                         drbd_set_role(mdev, R_SECONDARY, 0);
2246                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2247                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2248                           * we do not need to wait for the after state change work either. */
2249                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2250                         if (rv2 != SS_SUCCESS) {
2251                                 drbd_khelper(mdev, "pri-lost-after-sb");
2252                         } else {
2253                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2254                                 rv = hg;
2255                         }
2256                 } else
2257                         rv = hg;
2258         }
2259
2260         return rv;
2261 }
2262
2263 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2264 {
2265         int hg, rv = -100;
2266
2267         switch (mdev->net_conf->after_sb_2p) {
2268         case ASB_DISCARD_YOUNGER_PRI:
2269         case ASB_DISCARD_OLDER_PRI:
2270         case ASB_DISCARD_LEAST_CHG:
2271         case ASB_DISCARD_LOCAL:
2272         case ASB_DISCARD_REMOTE:
2273         case ASB_CONSENSUS:
2274         case ASB_DISCARD_SECONDARY:
2275                 dev_err(DEV, "Configuration error.\n");
2276                 break;
2277         case ASB_VIOLENTLY:
2278                 rv = drbd_asb_recover_0p(mdev);
2279                 break;
2280         case ASB_DISCONNECT:
2281                 break;
2282         case ASB_CALL_HELPER:
2283                 hg = drbd_asb_recover_0p(mdev);
2284                 if (hg == -1) {
2285                         enum drbd_state_rv rv2;
2286
2287                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2288                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2289                           * we do not need to wait for the after state change work either. */
2290                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2291                         if (rv2 != SS_SUCCESS) {
2292                                 drbd_khelper(mdev, "pri-lost-after-sb");
2293                         } else {
2294                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2295                                 rv = hg;
2296                         }
2297                 } else
2298                         rv = hg;
2299         }
2300
2301         return rv;
2302 }
2303
2304 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2305                            u64 bits, u64 flags)
2306 {
2307         if (!uuid) {
2308                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2309                 return;
2310         }
2311         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2312              text,
2313              (unsigned long long)uuid[UI_CURRENT],
2314              (unsigned long long)uuid[UI_BITMAP],
2315              (unsigned long long)uuid[UI_HISTORY_START],
2316              (unsigned long long)uuid[UI_HISTORY_END],
2317              (unsigned long long)bits,
2318              (unsigned long long)flags);
2319 }
2320
2321 /*
2322   100   after split brain try auto recover
2323     2   C_SYNC_SOURCE set BitMap
2324     1   C_SYNC_SOURCE use BitMap
2325     0   no Sync
2326    -1   C_SYNC_TARGET use BitMap
2327    -2   C_SYNC_TARGET set BitMap
2328  -100   after split brain, disconnect
2329 -1000   unrelated data
2330 -1091   requires proto 91
2331 -1096   requires proto 96
2332  */
2333 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2334 {
2335         u64 self, peer;
2336         int i, j;
2337
2338         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2339         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2340
2341         *rule_nr = 10;
2342         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2343                 return 0;
2344
2345         *rule_nr = 20;
2346         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2347              peer != UUID_JUST_CREATED)
2348                 return -2;
2349
2350         *rule_nr = 30;
2351         if (self != UUID_JUST_CREATED &&
2352             (peer == UUID_JUST_CREATED || peer == (u64)0))
2353                 return 2;
2354
2355         if (self == peer) {
2356                 int rct, dc; /* roles at crash time */
2357
2358                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2359
2360                         if (mdev->agreed_pro_version < 91)
2361                                 return -1091;
2362
2363                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2364                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2365                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2366                                 drbd_uuid_set_bm(mdev, 0UL);
2367
2368                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2369                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2370                                 *rule_nr = 34;
2371                         } else {
2372                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2373                                 *rule_nr = 36;
2374                         }
2375
2376                         return 1;
2377                 }
2378
2379                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2380
2381                         if (mdev->agreed_pro_version < 91)
2382                                 return -1091;
2383
2384                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2385                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2386                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2387
2388                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2389                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2390                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2391
2392                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2393                                 *rule_nr = 35;
2394                         } else {
2395                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2396                                 *rule_nr = 37;
2397                         }
2398
2399                         return -1;
2400                 }
2401
2402                 /* Common power [off|failure] */
2403                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2404                         (mdev->p_uuid[UI_FLAGS] & 2);
2405                 /* lowest bit is set when we were primary,
2406                  * next bit (weight 2) is set when peer was primary */
2407                 *rule_nr = 40;
2408
2409                 switch (rct) {
2410                 case 0: /* !self_pri && !peer_pri */ return 0;
2411                 case 1: /*  self_pri && !peer_pri */ return 1;
2412                 case 2: /* !self_pri &&  peer_pri */ return -1;
2413                 case 3: /*  self_pri &&  peer_pri */
2414                         dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2415                         return dc ? -1 : 1;
2416                 }
2417         }
2418
2419         *rule_nr = 50;
2420         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2421         if (self == peer)
2422                 return -1;
2423
2424         *rule_nr = 51;
2425         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2426         if (self == peer) {
2427                 if (mdev->agreed_pro_version < 96 ?
2428                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2429                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2430                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2431                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2432                            resync as sync source modifications of the peer's UUIDs. */
2433
2434                         if (mdev->agreed_pro_version < 91)
2435                                 return -1091;
2436
2437                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2438                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2439
2440                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2441                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2442
2443                         return -1;
2444                 }
2445         }
2446
2447         *rule_nr = 60;
2448         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2449         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2450                 peer = mdev->p_uuid[i] & ~((u64)1);
2451                 if (self == peer)
2452                         return -2;
2453         }
2454
2455         *rule_nr = 70;
2456         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2457         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2458         if (self == peer)
2459                 return 1;
2460
2461         *rule_nr = 71;
2462         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2463         if (self == peer) {
2464                 if (mdev->agreed_pro_version < 96 ?
2465                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2466                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2467                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2468                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2469                            resync as sync source modifications of our UUIDs. */
2470
2471                         if (mdev->agreed_pro_version < 91)
2472                                 return -1091;
2473
2474                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2475                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2476
2477                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2478                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2479                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2480
2481                         return 1;
2482                 }
2483         }
2484
2485
2486         *rule_nr = 80;
2487         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2488         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2489                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2490                 if (self == peer)
2491                         return 2;
2492         }
2493
2494         *rule_nr = 90;
2495         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2496         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2497         if (self == peer && self != ((u64)0))
2498                 return 100;
2499
2500         *rule_nr = 100;
2501         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2502                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2503                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2504                         peer = mdev->p_uuid[j] & ~((u64)1);
2505                         if (self == peer)
2506                                 return -100;
2507                 }
2508         }
2509
2510         return -1000;
2511 }
2512
2513 /* drbd_sync_handshake() returns the new conn state on success, or
2514    CONN_MASK (-1) on failure.
2515  */
2516 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2517                                            enum drbd_disk_state peer_disk) __must_hold(local)
2518 {
2519         int hg, rule_nr;
2520         enum drbd_conns rv = C_MASK;
2521         enum drbd_disk_state mydisk;
2522
2523         mydisk = mdev->state.disk;
2524         if (mydisk == D_NEGOTIATING)
2525                 mydisk = mdev->new_state_tmp.disk;
2526
2527         dev_info(DEV, "drbd_sync_handshake:\n");
2528         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2529         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2530                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2531
2532         hg = drbd_uuid_compare(mdev, &rule_nr);
2533
2534         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2535
2536         if (hg == -1000) {
2537                 dev_alert(DEV, "Unrelated data, aborting!\n");
2538                 return C_MASK;
2539         }
2540         if (hg < -1000) {
2541                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2542                 return C_MASK;
2543         }
2544
2545         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2546             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2547                 int f = (hg == -100) || abs(hg) == 2;
2548                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2549                 if (f)
2550                         hg = hg*2;
2551                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2552                      hg > 0 ? "source" : "target");
2553         }
2554
2555         if (abs(hg) == 100)
2556                 drbd_khelper(mdev, "initial-split-brain");
2557
2558         if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2559                 int pcount = (mdev->state.role == R_PRIMARY)
2560                            + (peer_role == R_PRIMARY);
2561                 int forced = (hg == -100);
2562
2563                 switch (pcount) {
2564                 case 0:
2565                         hg = drbd_asb_recover_0p(mdev);
2566                         break;
2567                 case 1:
2568                         hg = drbd_asb_recover_1p(mdev);
2569                         break;
2570                 case 2:
2571                         hg = drbd_asb_recover_2p(mdev);
2572                         break;
2573                 }
2574                 if (abs(hg) < 100) {
2575                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2576                              "automatically solved. Sync from %s node\n",
2577                              pcount, (hg < 0) ? "peer" : "this");
2578                         if (forced) {
2579                                 dev_warn(DEV, "Doing a full sync, since"
2580                                      " UUIDs where ambiguous.\n");
2581                                 hg = hg*2;
2582                         }
2583                 }
2584         }
2585
2586         if (hg == -100) {
2587                 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2588                         hg = -1;
2589                 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2590                         hg = 1;
2591
2592                 if (abs(hg) < 100)
2593                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2594                              "Sync from %s node\n",
2595                              (hg < 0) ? "peer" : "this");
2596         }
2597
2598         if (hg == -100) {
2599                 /* FIXME this log message is not correct if we end up here
2600                  * after an attempted attach on a diskless node.
2601                  * We just refuse to attach -- well, we drop the "connection"
2602                  * to that disk, in a way... */
2603                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2604                 drbd_khelper(mdev, "split-brain");
2605                 return C_MASK;
2606         }
2607
2608         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2609                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2610                 return C_MASK;
2611         }
2612
2613         if (hg < 0 && /* by intention we do not use mydisk here. */
2614             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2615                 switch (mdev->net_conf->rr_conflict) {
2616                 case ASB_CALL_HELPER:
2617                         drbd_khelper(mdev, "pri-lost");
2618                         /* fall through */
2619                 case ASB_DISCONNECT:
2620                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2621                         return C_MASK;
2622                 case ASB_VIOLENTLY:
2623                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2624                              "assumption\n");
2625                 }
2626         }
2627
2628         if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2629                 if (hg == 0)
2630                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2631                 else
2632                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2633                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2634                                  abs(hg) >= 2 ? "full" : "bit-map based");
2635                 return C_MASK;
2636         }
2637
2638         if (abs(hg) >= 2) {
2639                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2640                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2641                                         BM_LOCKED_SET_ALLOWED))
2642                         return C_MASK;
2643         }
2644
2645         if (hg > 0) { /* become sync source. */
2646                 rv = C_WF_BITMAP_S;
2647         } else if (hg < 0) { /* become sync target */
2648                 rv = C_WF_BITMAP_T;
2649         } else {
2650                 rv = C_CONNECTED;
2651                 if (drbd_bm_total_weight(mdev)) {
2652                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2653                              drbd_bm_total_weight(mdev));
2654                 }
2655         }
2656
2657         return rv;
2658 }
2659
2660 /* returns 1 if invalid */
2661 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2662 {
2663         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2664         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2665             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2666                 return 0;
2667
2668         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2669         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2670             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2671                 return 1;
2672
2673         /* everything else is valid if they are equal on both sides. */
2674         if (peer == self)
2675                 return 0;
2676
2677         /* everything es is invalid. */
2678         return 1;
2679 }
2680
2681 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2682 {
2683         struct p_protocol *p = &mdev->data.rbuf.protocol;
2684         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2685         int p_want_lose, p_two_primaries, cf;
2686         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2687
2688         p_proto         = be32_to_cpu(p->protocol);
2689         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2690         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2691         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2692         p_two_primaries = be32_to_cpu(p->two_primaries);
2693         cf              = be32_to_cpu(p->conn_flags);
2694         p_want_lose = cf & CF_WANT_LOSE;
2695
2696         clear_bit(CONN_DRY_RUN, &mdev->flags);
2697
2698         if (cf & CF_DRY_RUN)
2699                 set_bit(CONN_DRY_RUN, &mdev->flags);
2700
2701         if (p_proto != mdev->net_conf->wire_protocol) {
2702                 dev_err(DEV, "incompatible communication protocols\n");
2703                 goto disconnect;
2704         }
2705
2706         if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2707                 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2708                 goto disconnect;
2709         }
2710
2711         if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2712                 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2713                 goto disconnect;
2714         }
2715
2716         if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2717                 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2718                 goto disconnect;
2719         }
2720
2721         if (p_want_lose && mdev->net_conf->want_lose) {
2722                 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2723                 goto disconnect;
2724         }
2725
2726         if (p_two_primaries != mdev->net_conf->two_primaries) {
2727                 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2728                 goto disconnect;
2729         }
2730
2731         if (mdev->agreed_pro_version >= 87) {
2732                 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2733
2734                 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2735                         return false;
2736
2737                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2738                 if (strcmp(p_integrity_alg, my_alg)) {
2739                         dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2740                         goto disconnect;
2741                 }
2742                 dev_info(DEV, "data-integrity-alg: %s\n",
2743                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2744         }
2745
2746         return true;
2747
2748 disconnect:
2749         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2750         return false;
2751 }
2752
2753 /* helper function
2754  * input: alg name, feature name
2755  * return: NULL (alg name was "")
2756  *         ERR_PTR(error) if something goes wrong
2757  *         or the crypto hash ptr, if it worked out ok. */
2758 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2759                 const char *alg, const char *name)
2760 {
2761         struct crypto_hash *tfm;
2762
2763         if (!alg[0])
2764                 return NULL;
2765
2766         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2767         if (IS_ERR(tfm)) {
2768                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2769                         alg, name, PTR_ERR(tfm));
2770                 return tfm;
2771         }
2772         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2773                 crypto_free_hash(tfm);
2774                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2775                 return ERR_PTR(-EINVAL);
2776         }
2777         return tfm;
2778 }
2779
2780 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2781 {
2782         int ok = true;
2783         struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2784         unsigned int header_size, data_size, exp_max_sz;
2785         struct crypto_hash *verify_tfm = NULL;
2786         struct crypto_hash *csums_tfm = NULL;
2787         const int apv = mdev->agreed_pro_version;
2788         int *rs_plan_s = NULL;
2789         int fifo_size = 0;
2790
2791         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2792                     : apv == 88 ? sizeof(struct p_rs_param)
2793                                         + SHARED_SECRET_MAX
2794                     : apv <= 94 ? sizeof(struct p_rs_param_89)
2795                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2796
2797         if (packet_size > exp_max_sz) {
2798                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2799                     packet_size, exp_max_sz);
2800                 return false;
2801         }
2802
2803         if (apv <= 88) {
2804                 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2805                 data_size   = packet_size  - header_size;
2806         } else if (apv <= 94) {
2807                 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2808                 data_size   = packet_size  - header_size;
2809                 D_ASSERT(data_size == 0);
2810         } else {
2811                 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2812                 data_size   = packet_size  - header_size;
2813                 D_ASSERT(data_size == 0);
2814         }
2815
2816         /* initialize verify_alg and csums_alg */
2817         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2818
2819         if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2820                 return false;
2821
2822         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2823
2824         if (apv >= 88) {
2825                 if (apv == 88) {
2826                         if (data_size > SHARED_SECRET_MAX) {
2827                                 dev_err(DEV, "verify-alg too long, "
2828                                     "peer wants %u, accepting only %u byte\n",
2829                                                 data_size, SHARED_SECRET_MAX);
2830                                 return false;
2831                         }
2832
2833                         if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2834                                 return false;
2835
2836                         /* we expect NUL terminated string */
2837                         /* but just in case someone tries to be evil */
2838                         D_ASSERT(p->verify_alg[data_size-1] == 0);
2839                         p->verify_alg[data_size-1] = 0;
2840
2841                 } else /* apv >= 89 */ {
2842                         /* we still expect NUL terminated strings */
2843                         /* but just in case someone tries to be evil */
2844                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2845                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2846                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2847                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2848                 }
2849
2850                 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2851                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2852                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2853                                     mdev->sync_conf.verify_alg, p->verify_alg);
2854                                 goto disconnect;
2855                         }
2856                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2857                                         p->verify_alg, "verify-alg");
2858                         if (IS_ERR(verify_tfm)) {
2859                                 verify_tfm = NULL;
2860                                 goto disconnect;
2861                         }
2862                 }
2863
2864                 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2865                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2866                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2867                                     mdev->sync_conf.csums_alg, p->csums_alg);
2868                                 goto disconnect;
2869                         }
2870                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2871                                         p->csums_alg, "csums-alg");
2872                         if (IS_ERR(csums_tfm)) {
2873                                 csums_tfm = NULL;
2874                                 goto disconnect;
2875                         }
2876                 }
2877
2878                 if (apv > 94) {
2879                         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2880                         mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2881                         mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2882                         mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2883                         mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2884
2885                         fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2886                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2887                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2888                                 if (!rs_plan_s) {
2889                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
2890                                         goto disconnect;
2891                                 }
2892                         }
2893                 }
2894
2895                 spin_lock(&mdev->peer_seq_lock);
2896                 /* lock against drbd_nl_syncer_conf() */
2897                 if (verify_tfm) {
2898                         strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2899                         mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2900                         crypto_free_hash(mdev->verify_tfm);
2901                         mdev->verify_tfm = verify_tfm;
2902                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2903                 }
2904                 if (csums_tfm) {
2905                         strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2906                         mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2907                         crypto_free_hash(mdev->csums_tfm);
2908                         mdev->csums_tfm = csums_tfm;
2909                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2910                 }
2911                 if (fifo_size != mdev->rs_plan_s.size) {
2912                         kfree(mdev->rs_plan_s.values);
2913                         mdev->rs_plan_s.values = rs_plan_s;
2914                         mdev->rs_plan_s.size   = fifo_size;
2915                         mdev->rs_planed = 0;
2916                 }
2917                 spin_unlock(&mdev->peer_seq_lock);
2918         }
2919
2920         return ok;
2921 disconnect:
2922         /* just for completeness: actually not needed,
2923          * as this is not reached if csums_tfm was ok. */
2924         crypto_free_hash(csums_tfm);
2925         /* but free the verify_tfm again, if csums_tfm did not work out */
2926         crypto_free_hash(verify_tfm);
2927         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2928         return false;
2929 }
2930
2931 /* warn if the arguments differ by more than 12.5% */
2932 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2933         const char *s, sector_t a, sector_t b)
2934 {
2935         sector_t d;
2936         if (a == 0 || b == 0)
2937                 return;
2938         d = (a > b) ? (a - b) : (b - a);
2939         if (d > (a>>3) || d > (b>>3))
2940                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2941                      (unsigned long long)a, (unsigned long long)b);
2942 }
2943
2944 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2945 {
2946         struct p_sizes *p = &mdev->data.rbuf.sizes;
2947         enum determine_dev_size dd = unchanged;
2948         sector_t p_size, p_usize, my_usize;
2949         int ldsc = 0; /* local disk size changed */
2950         enum dds_flags ddsf;
2951
2952         p_size = be64_to_cpu(p->d_size);
2953         p_usize = be64_to_cpu(p->u_size);
2954
2955         if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2956                 dev_err(DEV, "some backing storage is needed\n");
2957                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2958                 return false;
2959         }
2960
2961         /* just store the peer's disk size for now.
2962          * we still need to figure out whether we accept that. */
2963         mdev->p_size = p_size;
2964
2965         if (get_ldev(mdev)) {
2966                 warn_if_differ_considerably(mdev, "lower level device sizes",
2967                            p_size, drbd_get_max_capacity(mdev->ldev));
2968                 warn_if_differ_considerably(mdev, "user requested size",
2969                                             p_usize, mdev->ldev->dc.disk_size);
2970
2971                 /* if this is the first connect, or an otherwise expected
2972                  * param exchange, choose the minimum */
2973                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2974                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2975                                              p_usize);
2976
2977                 my_usize = mdev->ldev->dc.disk_size;
2978
2979                 if (mdev->ldev->dc.disk_size != p_usize) {
2980                         mdev->ldev->dc.disk_size = p_usize;
2981                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2982                              (unsigned long)mdev->ldev->dc.disk_size);
2983                 }
2984
2985                 /* Never shrink a device with usable data during connect.
2986                    But allow online shrinking if we are connected. */
2987                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2988                    drbd_get_capacity(mdev->this_bdev) &&
2989                    mdev->state.disk >= D_OUTDATED &&
2990                    mdev->state.conn < C_CONNECTED) {
2991                         dev_err(DEV, "The peer's disk size is too small!\n");
2992                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2993                         mdev->ldev->dc.disk_size = my_usize;
2994                         put_ldev(mdev);
2995                         return false;
2996                 }
2997                 put_ldev(mdev);
2998         }
2999
3000         ddsf = be16_to_cpu(p->dds_flags);
3001         if (get_ldev(mdev)) {
3002                 dd = drbd_determine_dev_size(mdev, ddsf);
3003                 put_ldev(mdev);
3004                 if (dd == dev_size_error)
3005                         return false;
3006                 drbd_md_sync(mdev);
3007         } else {
3008                 /* I am diskless, need to accept the peer's size. */
3009                 drbd_set_my_capacity(mdev, p_size);
3010         }
3011
3012         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3013         drbd_reconsider_max_bio_size(mdev);
3014
3015         if (get_ldev(mdev)) {
3016                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3017                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3018                         ldsc = 1;
3019                 }
3020
3021                 put_ldev(mdev);
3022         }
3023
3024         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3025                 if (be64_to_cpu(p->c_size) !=
3026                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3027                         /* we have different sizes, probably peer
3028                          * needs to know my new size... */
3029                         drbd_send_sizes(mdev, 0, ddsf);
3030                 }
3031                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3032                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3033                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3034                             mdev->state.disk >= D_INCONSISTENT) {
3035                                 if (ddsf & DDSF_NO_RESYNC)
3036                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3037                                 else
3038                                         resync_after_online_grow(mdev);
3039                         } else
3040                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3041                 }
3042         }
3043
3044         return true;
3045 }
3046
3047 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3048 {
3049         struct p_uuids *p = &mdev->data.rbuf.uuids;
3050         u64 *p_uuid;
3051         int i, updated_uuids = 0;
3052
3053         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3054
3055         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3056                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3057
3058         kfree(mdev->p_uuid);
3059         mdev->p_uuid = p_uuid;
3060
3061         if (mdev->state.conn < C_CONNECTED &&
3062             mdev->state.disk < D_INCONSISTENT &&
3063             mdev->state.role == R_PRIMARY &&
3064             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3065                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3066                     (unsigned long long)mdev->ed_uuid);
3067                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3068                 return false;
3069         }
3070
3071         if (get_ldev(mdev)) {
3072                 int skip_initial_sync =
3073                         mdev->state.conn == C_CONNECTED &&
3074                         mdev->agreed_pro_version >= 90 &&
3075                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3076                         (p_uuid[UI_FLAGS] & 8);
3077                 if (skip_initial_sync) {
3078                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3079                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3080                                         "clear_n_write from receive_uuids",
3081                                         BM_LOCKED_TEST_ALLOWED);
3082                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3083                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3084                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3085                                         CS_VERBOSE, NULL);
3086                         drbd_md_sync(mdev);
3087                         updated_uuids = 1;
3088                 }
3089                 put_ldev(mdev);
3090         } else if (mdev->state.disk < D_INCONSISTENT &&
3091                    mdev->state.role == R_PRIMARY) {
3092                 /* I am a diskless primary, the peer just created a new current UUID
3093                    for me. */
3094                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3095         }
3096
3097         /* Before we test for the disk state, we should wait until an eventually
3098            ongoing cluster wide state change is finished. That is important if
3099            we are primary and are detaching from our disk. We need to see the
3100            new disk state... */
3101         wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3102         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3103                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3104
3105         if (updated_uuids)
3106                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3107
3108         return true;
3109 }
3110
3111 /**
3112  * convert_state() - Converts the peer's view of the cluster state to our point of view
3113  * @ps:         The state as seen by the peer.
3114  */
3115 static union drbd_state convert_state(union drbd_state ps)
3116 {
3117         union drbd_state ms;
3118
3119         static enum drbd_conns c_tab[] = {
3120                 [C_CONNECTED] = C_CONNECTED,
3121
3122                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3123                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3124                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3125                 [C_VERIFY_S]       = C_VERIFY_T,
3126                 [C_MASK]   = C_MASK,
3127         };
3128
3129         ms.i = ps.i;
3130
3131         ms.conn = c_tab[ps.conn];
3132         ms.peer = ps.role;
3133         ms.role = ps.peer;
3134         ms.pdsk = ps.disk;
3135         ms.disk = ps.pdsk;
3136         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3137
3138         return ms;
3139 }
3140
3141 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3142 {
3143         struct p_req_state *p = &mdev->data.rbuf.req_state;
3144         union drbd_state mask, val;
3145         enum drbd_state_rv rv;
3146
3147         mask.i = be32_to_cpu(p->mask);
3148         val.i = be32_to_cpu(p->val);
3149
3150         if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3151             test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3152                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3153                 return true;
3154         }
3155
3156         mask = convert_state(mask);
3157         val = convert_state(val);
3158
3159         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3160
3161         drbd_send_sr_reply(mdev, rv);
3162         drbd_md_sync(mdev);
3163
3164         return true;
3165 }
3166
3167 static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3168 {
3169         struct p_state *p = &mdev->data.rbuf.state;
3170         union drbd_state os, ns, peer_state;
3171         enum drbd_disk_state real_peer_disk;
3172         enum chg_state_flags cs_flags;
3173         int rv;
3174
3175         peer_state.i = be32_to_cpu(p->state);
3176
3177         real_peer_disk = peer_state.disk;
3178         if (peer_state.disk == D_NEGOTIATING) {
3179                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3180                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3181         }
3182
3183         spin_lock_irq(&mdev->req_lock);
3184  retry:
3185         os = ns = mdev->state;
3186         spin_unlock_irq(&mdev->req_lock);
3187
3188         /* peer says his disk is uptodate, while we think it is inconsistent,
3189          * and this happens while we think we have a sync going on. */
3190         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3191             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3192                 /* If we are (becoming) SyncSource, but peer is still in sync
3193                  * preparation, ignore its uptodate-ness to avoid flapping, it
3194                  * will change to inconsistent once the peer reaches active
3195                  * syncing states.
3196                  * It may have changed syncer-paused flags, however, so we
3197                  * cannot ignore this completely. */
3198                 if (peer_state.conn > C_CONNECTED &&
3199                     peer_state.conn < C_SYNC_SOURCE)
3200                         real_peer_disk = D_INCONSISTENT;
3201
3202                 /* if peer_state changes to connected at the same time,
3203                  * it explicitly notifies us that it finished resync.
3204                  * Maybe we should finish it up, too? */
3205                 else if (os.conn >= C_SYNC_SOURCE &&
3206                          peer_state.conn == C_CONNECTED) {
3207                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3208                                 drbd_resync_finished(mdev);
3209                         return true;
3210                 }
3211         }
3212
3213         /* peer says his disk is inconsistent, while we think it is uptodate,
3214          * and this happens while the peer still thinks we have a sync going on,
3215          * but we think we are already done with the sync.
3216          * We ignore this to avoid flapping pdsk.
3217          * This should not happen, if the peer is a recent version of drbd. */
3218         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3219             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3220                 real_peer_disk = D_UP_TO_DATE;
3221
3222         if (ns.conn == C_WF_REPORT_PARAMS)
3223                 ns.conn = C_CONNECTED;
3224
3225         if (peer_state.conn == C_AHEAD)
3226                 ns.conn = C_BEHIND;
3227
3228         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3229             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3230                 int cr; /* consider resync */
3231
3232                 /* if we established a new connection */
3233                 cr  = (os.conn < C_CONNECTED);
3234                 /* if we had an established connection
3235                  * and one of the nodes newly attaches a disk */
3236                 cr |= (os.conn == C_CONNECTED &&
3237                        (peer_state.disk == D_NEGOTIATING ||
3238                         os.disk == D_NEGOTIATING));
3239                 /* if we have both been inconsistent, and the peer has been
3240                  * forced to be UpToDate with --overwrite-data */
3241                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3242                 /* if we had been plain connected, and the admin requested to
3243                  * start a sync by "invalidate" or "invalidate-remote" */
3244                 cr |= (os.conn == C_CONNECTED &&
3245                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3246                                  peer_state.conn <= C_WF_BITMAP_T));
3247
3248                 if (cr)
3249                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3250
3251                 put_ldev(mdev);
3252                 if (ns.conn == C_MASK) {
3253                         ns.conn = C_CONNECTED;
3254                         if (mdev->state.disk == D_NEGOTIATING) {
3255                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3256                         } else if (peer_state.disk == D_NEGOTIATING) {
3257                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3258                                 peer_state.disk = D_DISKLESS;
3259                                 real_peer_disk = D_DISKLESS;
3260                         } else {
3261                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3262                                         return false;
3263                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3264                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3265                                 return false;
3266                         }
3267                 }
3268         }
3269
3270         spin_lock_irq(&mdev->req_lock);
3271         if (mdev->state.i != os.i)
3272                 goto retry;
3273         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3274         ns.peer = peer_state.role;
3275         ns.pdsk = real_peer_disk;
3276         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3277         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3278                 ns.disk = mdev->new_state_tmp.disk;
3279         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3280         if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3281             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3282                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3283                    for temporal network outages! */
3284                 spin_unlock_irq(&mdev->req_lock);
3285                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3286                 tl_clear(mdev);
3287                 drbd_uuid_new_current(mdev);
3288                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3289                 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3290                 return false;
3291         }
3292         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3293         ns = mdev->state;
3294         spin_unlock_irq(&mdev->req_lock);
3295
3296         if (rv < SS_SUCCESS) {
3297                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3298                 return false;
3299         }
3300
3301         if (os.conn > C_WF_REPORT_PARAMS) {
3302                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3303                     peer_state.disk != D_NEGOTIATING ) {
3304                         /* we want resync, peer has not yet decided to sync... */
3305                         /* Nowadays only used when forcing a node into primary role and
3306                            setting its disk to UpToDate with that */
3307                         drbd_send_uuids(mdev);
3308                         drbd_send_state(mdev);
3309                 }
3310         }
3311
3312         mdev->net_conf->want_lose = 0;
3313
3314         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3315
3316         return true;
3317 }
3318
3319 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3320 {
3321         struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3322
3323         wait_event(mdev->misc_wait,
3324                    mdev->state.conn == C_WF_SYNC_UUID ||
3325                    mdev->state.conn == C_BEHIND ||
3326                    mdev->state.conn < C_CONNECTED ||
3327                    mdev->state.disk < D_NEGOTIATING);
3328
3329         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3330
3331         /* Here the _drbd_uuid_ functions are right, current should
3332            _not_ be rotated into the history */
3333         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3334                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3335                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3336
3337                 drbd_print_uuids(mdev, "updated sync uuid");
3338                 drbd_start_resync(mdev, C_SYNC_TARGET);
3339
3340                 put_ldev(mdev);
3341         } else
3342                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3343
3344         return true;
3345 }
3346
3347 /**
3348  * receive_bitmap_plain
3349  *
3350  * Return 0 when done, 1 when another iteration is needed, and a negative error
3351  * code upon failure.
3352  */
3353 static int
3354 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3355                      unsigned long *buffer, struct bm_xfer_ctx *c)
3356 {
3357         unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3358         unsigned want = num_words * sizeof(long);
3359         int err;
3360
3361         if (want != data_size) {
3362                 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3363                 return -EIO;
3364         }
3365         if (want == 0)
3366                 return 0;
3367         err = drbd_recv(mdev, buffer, want);
3368         if (err != want) {
3369                 if (err >= 0)
3370                         err = -EIO;
3371                 return err;
3372         }
3373
3374         drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3375
3376         c->word_offset += num_words;
3377         c->bit_offset = c->word_offset * BITS_PER_LONG;
3378         if (c->bit_offset > c->bm_bits)
3379                 c->bit_offset = c->bm_bits;
3380
3381         return 1;
3382 }
3383
3384 /**
3385  * recv_bm_rle_bits
3386  *
3387  * Return 0 when done, 1 when another iteration is needed, and a negative error
3388  * code upon failure.
3389  */
3390 static int
3391 recv_bm_rle_bits(struct drbd_conf *mdev,
3392                 struct p_compressed_bm *p,
3393                 struct bm_xfer_ctx *c)
3394 {
3395         struct bitstream bs;
3396         u64 look_ahead;
3397         u64 rl;
3398         u64 tmp;
3399         unsigned long s = c->bit_offset;
3400         unsigned long e;
3401         int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3402         int toggle = DCBP_get_start(p);
3403         int have;
3404         int bits;
3405
3406         bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3407
3408         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3409         if (bits < 0)
3410                 return -EIO;
3411
3412         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3413                 bits = vli_decode_bits(&rl, look_ahead);
3414                 if (bits <= 0)
3415                         return -EIO;
3416
3417                 if (toggle) {
3418                         e = s + rl -1;
3419                         if (e >= c->bm_bits) {
3420                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3421                                 return -EIO;
3422                         }
3423                         _drbd_bm_set_bits(mdev, s, e);
3424                 }
3425
3426                 if (have < bits) {
3427                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3428                                 have, bits, look_ahead,
3429                                 (unsigned int)(bs.cur.b - p->code),
3430                                 (unsigned int)bs.buf_len);
3431                         return -EIO;
3432                 }
3433                 look_ahead >>= bits;
3434                 have -= bits;
3435
3436                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3437                 if (bits < 0)
3438                         return -EIO;
3439                 look_ahead |= tmp << have;
3440                 have += bits;
3441         }
3442
3443         c->bit_offset = s;
3444         bm_xfer_ctx_bit_to_word_offset(c);
3445
3446         return (s != c->bm_bits);
3447 }
3448
3449 /**
3450  * decode_bitmap_c
3451  *
3452  * Return 0 when done, 1 when another iteration is needed, and a negative error
3453  * code upon failure.
3454  */
3455 static int
3456 decode_bitmap_c(struct drbd_conf *mdev,
3457                 struct p_compressed_bm *p,
3458                 struct bm_xfer_ctx *c)
3459 {
3460         if (DCBP_get_code(p) == RLE_VLI_Bits)
3461                 return recv_bm_rle_bits(mdev, p, c);
3462
3463         /* other variants had been implemented for evaluation,
3464          * but have been dropped as this one turned out to be "best"
3465          * during all our tests. */
3466
3467         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3468         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3469         return -EIO;
3470 }
3471
3472 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3473                 const char *direction, struct bm_xfer_ctx *c)
3474 {
3475         /* what would it take to transfer it "plaintext" */
3476         unsigned plain = sizeof(struct p_header80) *
3477                 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3478                 + c->bm_words * sizeof(long);
3479         unsigned total = c->bytes[0] + c->bytes[1];
3480         unsigned r;
3481
3482         /* total can not be zero. but just in case: */
3483         if (total == 0)
3484                 return;
3485
3486         /* don't report if not compressed */
3487         if (total >= plain)
3488                 return;
3489
3490         /* total < plain. check for overflow, still */
3491         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3492                                     : (1000 * total / plain);
3493
3494         if (r > 1000)
3495                 r = 1000;
3496
3497         r = 1000 - r;
3498         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3499              "total %u; compression: %u.%u%%\n",
3500                         direction,
3501                         c->bytes[1], c->packets[1],
3502                         c->bytes[0], c->packets[0],
3503                         total, r/10, r % 10);
3504 }
3505
3506 /* Since we are processing the bitfield from lower addresses to higher,
3507    it does not matter if the process it in 32 bit chunks or 64 bit
3508    chunks as long as it is little endian. (Understand it as byte stream,
3509    beginning with the lowest byte...) If we would use big endian
3510    we would need to process it from the highest address to the lowest,
3511    in order to be agnostic to the 32 vs 64 bits issue.
3512
3513    returns 0 on failure, 1 if we successfully received it. */
3514 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3515 {
3516         struct bm_xfer_ctx c;
3517         void *buffer;
3518         int err;
3519         int ok = false;
3520         struct p_header80 *h = &mdev->data.rbuf.header.h80;
3521
3522         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3523         /* you are supposed to send additional out-of-sync information
3524          * if you actually set bits during this phase */
3525
3526         /* maybe we should use some per thread scratch page,
3527          * and allocate that during initial device creation? */
3528         buffer   = (unsigned long *) __get_free_page(GFP_NOIO);
3529         if (!buffer) {
3530                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3531                 goto out;
3532         }
3533
3534         c = (struct bm_xfer_ctx) {
3535                 .bm_bits = drbd_bm_bits(mdev),
3536                 .bm_words = drbd_bm_words(mdev),
3537         };
3538
3539         for(;;) {
3540                 if (cmd == P_BITMAP) {
3541                         err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3542                 } else if (cmd == P_COMPRESSED_BITMAP) {
3543                         /* MAYBE: sanity check that we speak proto >= 90,
3544                          * and the feature is enabled! */
3545                         struct p_compressed_bm *p;
3546
3547                         if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3548                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3549                                 goto out;
3550                         }
3551                         /* use the page buff */
3552                         p = buffer;
3553                         memcpy(p, h, sizeof(*h));
3554                         if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3555                                 goto out;
3556                         if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3557                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3558                                 goto out;
3559                         }
3560                         err = decode_bitmap_c(mdev, p, &c);
3561                 } else {
3562                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3563                         goto out;
3564                 }
3565
3566                 c.packets[cmd == P_BITMAP]++;
3567                 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3568
3569                 if (err <= 0) {
3570                         if (err < 0)
3571                                 goto out;
3572                         break;
3573                 }
3574                 if (!drbd_recv_header(mdev, &cmd, &data_size))
3575                         goto out;
3576         }
3577
3578         INFO_bm_xfer_stats(mdev, "receive", &c);
3579
3580         if (mdev->state.conn == C_WF_BITMAP_T) {
3581                 enum drbd_state_rv rv;
3582
3583                 ok = !drbd_send_bitmap(mdev);
3584                 if (!ok)
3585                         goto out;
3586                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3587                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3588                 D_ASSERT(rv == SS_SUCCESS);
3589         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3590                 /* admin may have requested C_DISCONNECTING,
3591                  * other threads may have noticed network errors */
3592                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3593                     drbd_conn_str(mdev->state.conn));
3594         }
3595
3596         ok = true;
3597  out:
3598         drbd_bm_unlock(mdev);
3599         if (ok && mdev->state.conn == C_WF_BITMAP_S)
3600                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3601         free_page((unsigned long) buffer);
3602         return ok;
3603 }
3604
3605 static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3606 {
3607         /* TODO zero copy sink :) */
3608         static char sink[128];
3609         int size, want, r;
3610
3611         dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3612                  cmd, data_size);
3613
3614         size = data_size;
3615         while (size > 0) {
3616                 want = min_t(int, size, sizeof(sink));
3617                 r = drbd_recv(mdev, sink, want);
3618                 ERR_IF(r <= 0) break;
3619                 size -= r;
3620         }
3621         return size == 0;
3622 }
3623
3624 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3625 {
3626         /* Make sure we've acked all the TCP data associated
3627          * with the data requests being unplugged */
3628         drbd_tcp_quickack(mdev->data.socket);
3629
3630         return true;
3631 }
3632
3633 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3634 {
3635         struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3636
3637         switch (mdev->state.conn) {
3638         case C_WF_SYNC_UUID:
3639         case C_WF_BITMAP_T:
3640         case C_BEHIND:
3641                         break;
3642         default:
3643                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3644                                 drbd_conn_str(mdev->state.conn));
3645         }
3646
3647         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3648
3649         return true;
3650 }
3651
3652 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3653
3654 struct data_cmd {
3655         int expect_payload;
3656         size_t pkt_size;
3657         drbd_cmd_handler_f function;
3658 };
3659
3660 static struct data_cmd drbd_cmd_handler[] = {
3661         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
3662         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
3663         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3664         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3665         [P_BITMAP]          = { 1, sizeof(struct p_header80), receive_bitmap } ,
3666         [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3667         [P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3668         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3669         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3670         [P_SYNC_PARAM]      = { 1, sizeof(struct p_header80), receive_SyncParam },
3671         [P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
3672         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3673         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
3674         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
3675         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
3676         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3677         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3678         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3679         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3680         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3681         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3682         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3683         /* anything missing from this table is in
3684          * the asender_tbl, see get_asender_cmd */
3685         [P_MAX_CMD]         = { 0, 0, NULL },
3686 };
3687
3688 /* All handler functions that expect a sub-header get that sub-heder in
3689    mdev->data.rbuf.header.head.payload.
3690
3691    Usually in mdev->data.rbuf.header.head the callback can find the usual
3692    p_header, but they may not rely on that. Since there is also p_header95 !
3693  */
3694
3695 static void drbdd(struct drbd_conf *mdev)
3696 {
3697         union p_header *header = &mdev->data.rbuf.header;
3698         unsigned int packet_size;
3699         enum drbd_packets cmd;
3700         size_t shs; /* sub header size */
3701         int rv;
3702
3703         while (get_t_state(&mdev->receiver) == Running) {
3704                 drbd_thread_current_set_cpu(mdev);
3705                 if (!drbd_recv_header(mdev, &cmd, &packet_size))
3706                         goto err_out;
3707
3708                 if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3709                         dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3710                         goto err_out;
3711                 }
3712
3713                 shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3714                 if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3715                         dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3716                         goto err_out;
3717                 }
3718
3719                 if (shs) {
3720                         rv = drbd_recv(mdev, &header->h80.payload, shs);
3721                         if (unlikely(rv != shs)) {
3722                                 if (!signal_pending(current))
3723                                         dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3724                                 goto err_out;
3725                         }
3726                 }
3727
3728                 rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3729
3730                 if (unlikely(!rv)) {
3731                         dev_err(DEV, "error receiving %s, l: %d!\n",
3732                             cmdname(cmd), packet_size);
3733                         goto err_out;
3734                 }
3735         }
3736
3737         if (0) {
3738         err_out:
3739                 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3740         }
3741         /* If we leave here, we probably want to update at least the
3742          * "Connected" indicator on stable storage. Do so explicitly here. */
3743         drbd_md_sync(mdev);
3744 }
3745
3746 void drbd_flush_workqueue(struct drbd_conf *mdev)
3747 {
3748         struct drbd_wq_barrier barr;
3749
3750         barr.w.cb = w_prev_work_done;
3751         init_completion(&barr.done);
3752         drbd_queue_work(&mdev->data.work, &barr.w);
3753         wait_for_completion(&barr.done);
3754 }
3755
3756 static void drbd_disconnect(struct drbd_conf *mdev)
3757 {
3758         enum drbd_fencing_p fp;
3759         union drbd_state os, ns;
3760         int rv = SS_UNKNOWN_ERROR;
3761         unsigned int i;
3762
3763         if (mdev->state.conn == C_STANDALONE)
3764                 return;
3765
3766         /* asender does not clean up anything. it must not interfere, either */
3767         drbd_thread_stop(&mdev->asender);
3768         drbd_free_sock(mdev);
3769
3770         /* wait for current activity to cease. */
3771         spin_lock_irq(&mdev->req_lock);
3772         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3773         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3774         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3775         spin_unlock_irq(&mdev->req_lock);
3776
3777         /* We do not have data structures that would allow us to
3778          * get the rs_pending_cnt down to 0 again.
3779          *  * On C_SYNC_TARGET we do not have any data structures describing
3780          *    the pending RSDataRequest's we have sent.
3781          *  * On C_SYNC_SOURCE there is no data structure that tracks
3782          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3783          *  And no, it is not the sum of the reference counts in the
3784          *  resync_LRU. The resync_LRU tracks the whole operation including
3785          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3786          *  on the fly. */
3787         drbd_rs_cancel_all(mdev);
3788         mdev->rs_total = 0;
3789         mdev->rs_failed = 0;
3790         atomic_set(&mdev->rs_pending_cnt, 0);
3791         wake_up(&mdev->misc_wait);
3792
3793         del_timer(&mdev->request_timer);
3794
3795         /* make sure syncer is stopped and w_resume_next_sg queued */
3796         del_timer_sync(&mdev->resync_timer);
3797         resync_timer_fn((unsigned long)mdev);
3798
3799         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3800          * w_make_resync_request etc. which may still be on the worker queue
3801          * to be "canceled" */
3802         drbd_flush_workqueue(mdev);
3803
3804         /* This also does reclaim_net_ee().  If we do this too early, we might
3805          * miss some resync ee and pages.*/
3806         drbd_process_done_ee(mdev);
3807
3808         kfree(mdev->p_uuid);
3809         mdev->p_uuid = NULL;
3810
3811         if (!is_susp(mdev->state))
3812                 tl_clear(mdev);
3813
3814         dev_info(DEV, "Connection closed\n");
3815
3816         drbd_md_sync(mdev);
3817
3818         fp = FP_DONT_CARE;
3819         if (get_ldev(mdev)) {
3820                 fp = mdev->ldev->dc.fencing;
3821                 put_ldev(mdev);
3822         }
3823
3824         if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3825                 drbd_try_outdate_peer_async(mdev);
3826
3827         spin_lock_irq(&mdev->req_lock);
3828         os = mdev->state;
3829         if (os.conn >= C_UNCONNECTED) {
3830                 /* Do not restart in case we are C_DISCONNECTING */
3831                 ns = os;
3832                 ns.conn = C_UNCONNECTED;
3833                 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3834         }
3835         spin_unlock_irq(&mdev->req_lock);
3836
3837         if (os.conn == C_DISCONNECTING) {
3838                 wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3839
3840                 crypto_free_hash(mdev->cram_hmac_tfm);
3841                 mdev->cram_hmac_tfm = NULL;
3842
3843                 kfree(mdev->net_conf);
3844                 mdev->net_conf = NULL;
3845                 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3846         }
3847
3848         /* serialize with bitmap writeout triggered by the state change,
3849          * if any. */
3850         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3851
3852         /* tcp_close and release of sendpage pages can be deferred.  I don't
3853          * want to use SO_LINGER, because apparently it can be deferred for
3854          * more than 20 seconds (longest time I checked).
3855          *
3856          * Actually we don't care for exactly when the network stack does its
3857          * put_page(), but release our reference on these pages right here.
3858          */
3859         i = drbd_release_ee(mdev, &mdev->net_ee);
3860         if (i)
3861                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3862         i = atomic_read(&mdev->pp_in_use_by_net);
3863         if (i)
3864                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3865         i = atomic_read(&mdev->pp_in_use);
3866         if (i)
3867                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3868
3869         D_ASSERT(list_empty(&mdev->read_ee));
3870         D_ASSERT(list_empty(&mdev->active_ee));
3871         D_ASSERT(list_empty(&mdev->sync_ee));
3872         D_ASSERT(list_empty(&mdev->done_ee));
3873
3874         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3875         atomic_set(&mdev->current_epoch->epoch_size, 0);
3876         D_ASSERT(list_empty(&mdev->current_epoch->list));
3877 }
3878
3879 /*
3880  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3881  * we can agree on is stored in agreed_pro_version.
3882  *
3883  * feature flags and the reserved array should be enough room for future
3884  * enhancements of the handshake protocol, and possible plugins...
3885  *
3886  * for now, they are expected to be zero, but ignored.
3887  */
3888 static int drbd_send_handshake(struct drbd_conf *mdev)
3889 {
3890         /* ASSERT current == mdev->receiver ... */
3891         struct p_handshake *p = &mdev->data.sbuf.handshake;
3892         int ok;
3893
3894         if (mutex_lock_interruptible(&mdev->data.mutex)) {
3895                 dev_err(DEV, "interrupted during initial handshake\n");
3896                 return 0; /* interrupted. not ok. */
3897         }
3898
3899         if (mdev->data.socket == NULL) {
3900                 mutex_unlock(&mdev->data.mutex);
3901                 return 0;
3902         }
3903
3904         memset(p, 0, sizeof(*p));
3905         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3906         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3907         ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3908                              (struct p_header80 *)p, sizeof(*p), 0 );
3909         mutex_unlock(&mdev->data.mutex);
3910         return ok;
3911 }
3912
3913 /*
3914  * return values:
3915  *   1 yes, we have a valid connection
3916  *   0 oops, did not work out, please try again
3917  *  -1 peer talks different language,
3918  *     no point in trying again, please go standalone.
3919  */
3920 static int drbd_do_handshake(struct drbd_conf *mdev)
3921 {
3922         /* ASSERT current == mdev->receiver ... */
3923         struct p_handshake *p = &mdev->data.rbuf.handshake;
3924         const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
3925         unsigned int length;
3926         enum drbd_packets cmd;
3927         int rv;
3928
3929         rv = drbd_send_handshake(mdev);
3930         if (!rv)
3931                 return 0;
3932
3933         rv = drbd_recv_header(mdev, &cmd, &length);
3934         if (!rv)
3935                 return 0;
3936
3937         if (cmd != P_HAND_SHAKE) {
3938                 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3939                      cmdname(cmd), cmd);
3940                 return -1;
3941         }
3942
3943         if (length != expect) {
3944                 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3945                      expect, length);
3946                 return -1;
3947         }
3948
3949         rv = drbd_recv(mdev, &p->head.payload, expect);
3950
3951         if (rv != expect) {
3952                 if (!signal_pending(current))
3953                         dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
3954                 return 0;
3955         }
3956
3957         p->protocol_min = be32_to_cpu(p->protocol_min);
3958         p->protocol_max = be32_to_cpu(p->protocol_max);
3959         if (p->protocol_max == 0)
3960                 p->protocol_max = p->protocol_min;
3961
3962         if (PRO_VERSION_MAX < p->protocol_min ||
3963             PRO_VERSION_MIN > p->protocol_max)
3964                 goto incompat;
3965
3966         mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3967
3968         dev_info(DEV, "Handshake successful: "
3969              "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3970
3971         return 1;
3972
3973  incompat:
3974         dev_err(DEV, "incompatible DRBD dialects: "
3975             "I support %d-%d, peer supports %d-%d\n",
3976             PRO_VERSION_MIN, PRO_VERSION_MAX,
3977             p->protocol_min, p->protocol_max);
3978         return -1;
3979 }
3980
3981 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3982 static int drbd_do_auth(struct drbd_conf *mdev)
3983 {
3984         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3985         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3986         return -1;
3987 }
3988 #else
3989 #define CHALLENGE_LEN 64
3990
3991 /* Return value:
3992         1 - auth succeeded,
3993         0 - failed, try again (network error),
3994         -1 - auth failed, don't try again.
3995 */
3996
3997 static int drbd_do_auth(struct drbd_conf *mdev)
3998 {
3999         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4000         struct scatterlist sg;
4001         char *response = NULL;
4002         char *right_response = NULL;
4003         char *peers_ch = NULL;
4004         unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4005         unsigned int resp_size;
4006         struct hash_desc desc;
4007         enum drbd_packets cmd;
4008         unsigned int length;
4009         int rv;
4010
4011         desc.tfm = mdev->cram_hmac_tfm;
4012         desc.flags = 0;
4013
4014         rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4015                                 (u8 *)mdev->net_conf->shared_secret, key_len);
4016         if (rv) {
4017                 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4018                 rv = -1;
4019                 goto fail;
4020         }
4021
4022         get_random_bytes(my_challenge, CHALLENGE_LEN);
4023
4024         rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4025         if (!rv)
4026                 goto fail;
4027
4028         rv = drbd_recv_header(mdev, &cmd, &length);
4029         if (!rv)
4030                 goto fail;
4031
4032         if (cmd != P_AUTH_CHALLENGE) {
4033                 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4034                     cmdname(cmd), cmd);
4035                 rv = 0;
4036                 goto fail;
4037         }
4038
4039         if (length > CHALLENGE_LEN * 2) {
4040                 dev_err(DEV, "expected AuthChallenge payload too big.\n");
4041                 rv = -1;
4042                 goto fail;
4043         }
4044
4045         peers_ch = kmalloc(length, GFP_NOIO);
4046         if (peers_ch == NULL) {
4047                 dev_err(DEV, "kmalloc of peers_ch failed\n");
4048                 rv = -1;
4049                 goto fail;
4050         }
4051
4052         rv = drbd_recv(mdev, peers_ch, length);
4053
4054         if (rv != length) {
4055                 if (!signal_pending(current))
4056                         dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4057                 rv = 0;
4058                 goto fail;
4059         }
4060
4061         resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4062         response = kmalloc(resp_size, GFP_NOIO);
4063         if (response == NULL) {
4064                 dev_err(DEV, "kmalloc of response failed\n");
4065                 rv = -1;
4066                 goto fail;
4067         }
4068
4069         sg_init_table(&sg, 1);
4070         sg_set_buf(&sg, peers_ch, length);
4071
4072         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4073         if (rv) {
4074                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4075                 rv = -1;
4076                 goto fail;
4077         }
4078
4079         rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4080         if (!rv)
4081                 goto fail;
4082
4083         rv = drbd_recv_header(mdev, &cmd, &length);
4084         if (!rv)
4085                 goto fail;
4086
4087         if (cmd != P_AUTH_RESPONSE) {
4088                 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4089                         cmdname(cmd), cmd);
4090                 rv = 0;
4091                 goto fail;
4092         }
4093
4094         if (length != resp_size) {
4095                 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4096                 rv = 0;
4097                 goto fail;
4098         }
4099
4100         rv = drbd_recv(mdev, response , resp_size);
4101
4102         if (rv != resp_size) {
4103                 if (!signal_pending(current))
4104                         dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4105                 rv = 0;
4106                 goto fail;
4107         }
4108
4109         right_response = kmalloc(resp_size, GFP_NOIO);
4110         if (right_response == NULL) {
4111                 dev_err(DEV, "kmalloc of right_response failed\n");
4112                 rv = -1;
4113                 goto fail;
4114         }
4115
4116         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4117
4118         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4119         if (rv) {
4120                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4121                 rv = -1;
4122                 goto fail;
4123         }
4124
4125         rv = !memcmp(response, right_response, resp_size);
4126
4127         if (rv)
4128                 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4129                      resp_size, mdev->net_conf->cram_hmac_alg);
4130         else
4131                 rv = -1;
4132
4133  fail:
4134         kfree(peers_ch);
4135         kfree(response);
4136         kfree(right_response);
4137
4138         return rv;
4139 }
4140 #endif
4141
4142 int drbdd_init(struct drbd_thread *thi)
4143 {
4144         struct drbd_conf *mdev = thi->mdev;
4145         unsigned int minor = mdev_to_minor(mdev);
4146         int h;
4147
4148         sprintf(current->comm, "drbd%d_receiver", minor);
4149
4150         dev_info(DEV, "receiver (re)started\n");
4151
4152         do {
4153                 h = drbd_connect(mdev);
4154                 if (h == 0) {
4155                         drbd_disconnect(mdev);
4156                         schedule_timeout_interruptible(HZ);
4157                 }
4158                 if (h == -1) {
4159                         dev_warn(DEV, "Discarding network configuration.\n");
4160                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4161                 }
4162         } while (h == 0);
4163
4164         if (h > 0) {
4165                 if (get_net_conf(mdev)) {
4166                         drbdd(mdev);
4167                         put_net_conf(mdev);
4168                 }
4169         }
4170
4171         drbd_disconnect(mdev);
4172
4173         dev_info(DEV, "receiver terminated\n");
4174         return 0;
4175 }
4176
4177 /* ********* acknowledge sender ******** */
4178
4179 static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4180 {
4181         struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4182
4183         int retcode = be32_to_cpu(p->retcode);
4184
4185         if (retcode >= SS_SUCCESS) {
4186                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4187         } else {
4188                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4189                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4190                     drbd_set_st_err_str(retcode), retcode);
4191         }
4192         wake_up(&mdev->state_wait);
4193
4194         return true;
4195 }
4196
4197 static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4198 {
4199         return drbd_send_ping_ack(mdev);
4200
4201 }
4202
4203 static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4204 {
4205         /* restore idle timeout */
4206         mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4207         if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4208                 wake_up(&mdev->misc_wait);
4209
4210         return true;
4211 }
4212
4213 static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4214 {
4215         struct p_block_ack *p = (struct p_block_ack *)h;
4216         sector_t sector = be64_to_cpu(p->sector);
4217         int blksize = be32_to_cpu(p->blksize);
4218
4219         D_ASSERT(mdev->agreed_pro_version >= 89);
4220
4221         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4222
4223         if (get_ldev(mdev)) {
4224                 drbd_rs_complete_io(mdev, sector);
4225                 drbd_set_in_sync(mdev, sector, blksize);
4226                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4227                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4228                 put_ldev(mdev);
4229         }
4230         dec_rs_pending(mdev);
4231         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4232
4233         return true;
4234 }
4235
4236 static int
4237 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4238                               struct rb_root *root, const char *func,
4239                               enum drbd_req_event what, bool missing_ok)
4240 {
4241         struct drbd_request *req;
4242         struct bio_and_error m;
4243
4244         spin_lock_irq(&mdev->req_lock);
4245         req = find_request(mdev, root, id, sector, missing_ok, func);
4246         if (unlikely(!req)) {
4247                 spin_unlock_irq(&mdev->req_lock);
4248                 return false;
4249         }
4250         __req_mod(req, what, &m);
4251         spin_unlock_irq(&mdev->req_lock);
4252
4253         if (m.bio)
4254                 complete_master_bio(mdev, &m);
4255         return true;
4256 }
4257
4258 static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4259 {
4260         struct p_block_ack *p = (struct p_block_ack *)h;
4261         sector_t sector = be64_to_cpu(p->sector);
4262         int blksize = be32_to_cpu(p->blksize);
4263         enum drbd_req_event what;
4264
4265         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4266
4267         if (p->block_id == ID_SYNCER) {
4268                 drbd_set_in_sync(mdev, sector, blksize);
4269                 dec_rs_pending(mdev);
4270                 return true;
4271         }
4272         switch (be16_to_cpu(h->command)) {
4273         case P_RS_WRITE_ACK:
4274                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4275                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4276                 break;
4277         case P_WRITE_ACK:
4278                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4279                 what = WRITE_ACKED_BY_PEER;
4280                 break;
4281         case P_RECV_ACK:
4282                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4283                 what = RECV_ACKED_BY_PEER;
4284                 break;
4285         case P_DISCARD_ACK:
4286                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4287                 what = CONFLICT_DISCARDED_BY_PEER;
4288                 break;
4289         default:
4290                 D_ASSERT(0);
4291                 return false;
4292         }
4293
4294         return validate_req_change_req_state(mdev, p->block_id, sector,
4295                                              &mdev->write_requests, __func__,
4296                                              what, false);
4297 }
4298
4299 static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4300 {
4301         struct p_block_ack *p = (struct p_block_ack *)h;
4302         sector_t sector = be64_to_cpu(p->sector);
4303         int size = be32_to_cpu(p->blksize);
4304         bool missing_ok = mdev->net_conf->wire_protocol == DRBD_PROT_A ||
4305                           mdev->net_conf->wire_protocol == DRBD_PROT_B;
4306         bool found;
4307
4308         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4309
4310         if (p->block_id == ID_SYNCER) {
4311                 dec_rs_pending(mdev);
4312                 drbd_rs_failed_io(mdev, sector, size);
4313                 return true;
4314         }
4315
4316         found = validate_req_change_req_state(mdev, p->block_id, sector,
4317                                               &mdev->write_requests, __func__,
4318                                               NEG_ACKED, missing_ok);
4319         if (!found) {
4320                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4321                    The master bio might already be completed, therefore the
4322                    request is no longer in the collision hash. */
4323                 /* In Protocol B we might already have got a P_RECV_ACK
4324                    but then get a P_NEG_ACK afterwards. */
4325                 if (!missing_ok)
4326                         return false;
4327                 drbd_set_out_of_sync(mdev, sector, size);
4328         }
4329         return true;
4330 }
4331
4332 static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4333 {
4334         struct p_block_ack *p = (struct p_block_ack *)h;
4335         sector_t sector = be64_to_cpu(p->sector);
4336
4337         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4338         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4339             (unsigned long long)sector, be32_to_cpu(p->blksize));
4340
4341         return validate_req_change_req_state(mdev, p->block_id, sector,
4342                                              &mdev->read_requests, __func__,
4343                                              NEG_ACKED, false);
4344 }
4345
4346 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4347 {
4348         sector_t sector;
4349         int size;
4350         struct p_block_ack *p = (struct p_block_ack *)h;
4351
4352         sector = be64_to_cpu(p->sector);
4353         size = be32_to_cpu(p->blksize);
4354
4355         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4356
4357         dec_rs_pending(mdev);
4358
4359         if (get_ldev_if_state(mdev, D_FAILED)) {
4360                 drbd_rs_complete_io(mdev, sector);
4361                 switch (be16_to_cpu(h->command)) {
4362                 case P_NEG_RS_DREPLY:
4363                         drbd_rs_failed_io(mdev, sector, size);
4364                 case P_RS_CANCEL:
4365                         break;
4366                 default:
4367                         D_ASSERT(0);
4368                         put_ldev(mdev);
4369                         return false;
4370                 }
4371                 put_ldev(mdev);
4372         }
4373
4374         return true;
4375 }
4376
4377 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4378 {
4379         struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4380
4381         tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4382
4383         if (mdev->state.conn == C_AHEAD &&
4384             atomic_read(&mdev->ap_in_flight) == 0 &&
4385             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4386                 mdev->start_resync_timer.expires = jiffies + HZ;
4387                 add_timer(&mdev->start_resync_timer);
4388         }
4389
4390         return true;
4391 }
4392
4393 static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4394 {
4395         struct p_block_ack *p = (struct p_block_ack *)h;
4396         struct drbd_work *w;
4397         sector_t sector;
4398         int size;
4399
4400         sector = be64_to_cpu(p->sector);
4401         size = be32_to_cpu(p->blksize);
4402
4403         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4404
4405         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4406                 drbd_ov_oos_found(mdev, sector, size);
4407         else
4408                 ov_oos_print(mdev);
4409
4410         if (!get_ldev(mdev))
4411                 return true;
4412
4413         drbd_rs_complete_io(mdev, sector);
4414         dec_rs_pending(mdev);
4415
4416         --mdev->ov_left;
4417
4418         /* let's advance progress step marks only for every other megabyte */
4419         if ((mdev->ov_left & 0x200) == 0x200)
4420                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4421
4422         if (mdev->ov_left == 0) {
4423                 w = kmalloc(sizeof(*w), GFP_NOIO);
4424                 if (w) {
4425                         w->cb = w_ov_finished;
4426                         drbd_queue_work_front(&mdev->data.work, w);
4427                 } else {
4428                         dev_err(DEV, "kmalloc(w) failed.");
4429                         ov_oos_print(mdev);
4430                         drbd_resync_finished(mdev);
4431                 }
4432         }
4433         put_ldev(mdev);
4434         return true;
4435 }
4436
4437 static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4438 {
4439         return true;
4440 }
4441
4442 struct asender_cmd {
4443         size_t pkt_size;
4444         int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4445 };
4446
4447 static struct asender_cmd *get_asender_cmd(int cmd)
4448 {
4449         static struct asender_cmd asender_tbl[] = {
4450                 /* anything missing from this table is in
4451                  * the drbd_cmd_handler (drbd_default_handler) table,
4452                  * see the beginning of drbdd() */
4453         [P_PING]            = { sizeof(struct p_header80), got_Ping },
4454         [P_PING_ACK]        = { sizeof(struct p_header80), got_PingAck },
4455         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4456         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4457         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4458         [P_DISCARD_ACK]     = { sizeof(struct p_block_ack), got_BlockAck },
4459         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4460         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4461         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4462         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4463         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4464         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4465         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4466         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4467         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply},
4468         [P_MAX_CMD]         = { 0, NULL },
4469         };
4470         if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4471                 return NULL;
4472         return &asender_tbl[cmd];
4473 }
4474
4475 int drbd_asender(struct drbd_thread *thi)
4476 {
4477         struct drbd_conf *mdev = thi->mdev;
4478         struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4479         struct asender_cmd *cmd = NULL;
4480
4481         int rv, len;
4482         void *buf    = h;
4483         int received = 0;
4484         int expect   = sizeof(struct p_header80);
4485         int empty;
4486         int ping_timeout_active = 0;
4487
4488         sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4489
4490         current->policy = SCHED_RR;  /* Make this a realtime task! */
4491         current->rt_priority = 2;    /* more important than all other tasks */
4492
4493         while (get_t_state(thi) == Running) {
4494                 drbd_thread_current_set_cpu(mdev);
4495                 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4496                         ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4497                         mdev->meta.socket->sk->sk_rcvtimeo =
4498                                 mdev->net_conf->ping_timeo*HZ/10;
4499                         ping_timeout_active = 1;
4500                 }
4501
4502                 /* conditionally cork;
4503                  * it may hurt latency if we cork without much to send */
4504                 if (!mdev->net_conf->no_cork &&
4505                         3 < atomic_read(&mdev->unacked_cnt))
4506                         drbd_tcp_cork(mdev->meta.socket);
4507                 while (1) {
4508                         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4509                         flush_signals(current);
4510                         if (!drbd_process_done_ee(mdev))
4511                                 goto reconnect;
4512                         /* to avoid race with newly queued ACKs */
4513                         set_bit(SIGNAL_ASENDER, &mdev->flags);
4514                         spin_lock_irq(&mdev->req_lock);
4515                         empty = list_empty(&mdev->done_ee);
4516                         spin_unlock_irq(&mdev->req_lock);
4517                         /* new ack may have been queued right here,
4518                          * but then there is also a signal pending,
4519                          * and we start over... */
4520                         if (empty)
4521                                 break;
4522                 }
4523                 /* but unconditionally uncork unless disabled */
4524                 if (!mdev->net_conf->no_cork)
4525                         drbd_tcp_uncork(mdev->meta.socket);
4526
4527                 /* short circuit, recv_msg would return EINTR anyways. */
4528                 if (signal_pending(current))
4529                         continue;
4530
4531                 rv = drbd_recv_short(mdev, mdev->meta.socket,
4532                                      buf, expect-received, 0);
4533                 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4534
4535                 flush_signals(current);
4536
4537                 /* Note:
4538                  * -EINTR        (on meta) we got a signal
4539                  * -EAGAIN       (on meta) rcvtimeo expired
4540                  * -ECONNRESET   other side closed the connection
4541                  * -ERESTARTSYS  (on data) we got a signal
4542                  * rv <  0       other than above: unexpected error!
4543                  * rv == expected: full header or command
4544                  * rv <  expected: "woken" by signal during receive
4545                  * rv == 0       : "connection shut down by peer"
4546                  */
4547                 if (likely(rv > 0)) {
4548                         received += rv;
4549                         buf      += rv;
4550                 } else if (rv == 0) {
4551                         dev_err(DEV, "meta connection shut down by peer.\n");
4552                         goto reconnect;
4553                 } else if (rv == -EAGAIN) {
4554                         /* If the data socket received something meanwhile,
4555                          * that is good enough: peer is still alive. */
4556                         if (time_after(mdev->last_received,
4557                                 jiffies - mdev->meta.socket->sk->sk_rcvtimeo))
4558                                 continue;
4559                         if (ping_timeout_active) {
4560                                 dev_err(DEV, "PingAck did not arrive in time.\n");
4561                                 goto reconnect;
4562                         }
4563                         set_bit(SEND_PING, &mdev->flags);
4564                         continue;
4565                 } else if (rv == -EINTR) {
4566                         continue;
4567                 } else {
4568                         dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4569                         goto reconnect;
4570                 }
4571
4572                 if (received == expect && cmd == NULL) {
4573                         if (unlikely(h->magic != cpu_to_be32(DRBD_MAGIC))) {
4574                                 dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4575                                     be32_to_cpu(h->magic),
4576                                     be16_to_cpu(h->command),
4577                                     be16_to_cpu(h->length));
4578                                 goto reconnect;
4579                         }
4580                         cmd = get_asender_cmd(be16_to_cpu(h->command));
4581                         len = be16_to_cpu(h->length);
4582                         if (unlikely(cmd == NULL)) {
4583                                 dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4584                                     be32_to_cpu(h->magic),
4585                                     be16_to_cpu(h->command),
4586                                     be16_to_cpu(h->length));
4587                                 goto disconnect;
4588                         }
4589                         expect = cmd->pkt_size;
4590                         ERR_IF(len != expect-sizeof(struct p_header80))
4591                                 goto reconnect;
4592                 }
4593                 if (received == expect) {
4594                         mdev->last_received = jiffies;
4595                         D_ASSERT(cmd != NULL);
4596                         if (!cmd->process(mdev, h))
4597                                 goto reconnect;
4598
4599                         /* the idle_timeout (ping-int)
4600                          * has been restored in got_PingAck() */
4601                         if (cmd == get_asender_cmd(P_PING_ACK))
4602                                 ping_timeout_active = 0;
4603
4604                         buf      = h;
4605                         received = 0;
4606                         expect   = sizeof(struct p_header80);
4607                         cmd      = NULL;
4608                 }
4609         }
4610
4611         if (0) {
4612 reconnect:
4613                 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4614                 drbd_md_sync(mdev);
4615         }
4616         if (0) {
4617 disconnect:
4618                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4619                 drbd_md_sync(mdev);
4620         }
4621         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4622
4623         D_ASSERT(mdev->state.conn < C_CONNECTED);
4624         dev_info(DEV, "asender terminated\n");
4625
4626         return 0;
4627 }