]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
drbd: moved net_conf from mdev to tconn
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 enum finish_epoch {
52         FE_STILL_LIVE,
53         FE_DESTROYED,
54         FE_RECYCLED,
55 };
56
57 static int drbd_do_handshake(struct drbd_conf *mdev);
58 static int drbd_do_auth(struct drbd_conf *mdev);
59
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62
63
64 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
65
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
75 static struct page *page_chain_del(struct page **head, int n)
76 {
77         struct page *page;
78         struct page *tmp;
79
80         BUG_ON(!n);
81         BUG_ON(!head);
82
83         page = *head;
84
85         if (!page)
86                 return NULL;
87
88         while (page) {
89                 tmp = page_chain_next(page);
90                 if (--n == 0)
91                         break; /* found sufficient pages */
92                 if (tmp == NULL)
93                         /* insufficient pages, don't use any of them. */
94                         return NULL;
95                 page = tmp;
96         }
97
98         /* add end of list marker for the returned list */
99         set_page_private(page, 0);
100         /* actual return value, and adjustment of head */
101         page = *head;
102         *head = tmp;
103         return page;
104 }
105
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111         struct page *tmp;
112         int i = 1;
113         while ((tmp = page_chain_next(page)))
114                 ++i, page = tmp;
115         if (len)
116                 *len = i;
117         return page;
118 }
119
120 static int page_chain_free(struct page *page)
121 {
122         struct page *tmp;
123         int i = 0;
124         page_chain_for_each_safe(page, tmp) {
125                 put_page(page);
126                 ++i;
127         }
128         return i;
129 }
130
131 static void page_chain_add(struct page **head,
132                 struct page *chain_first, struct page *chain_last)
133 {
134 #if 1
135         struct page *tmp;
136         tmp = page_chain_tail(chain_first, NULL);
137         BUG_ON(tmp != chain_last);
138 #endif
139
140         /* add chain to head */
141         set_page_private(chain_last, (unsigned long)*head);
142         *head = chain_first;
143 }
144
145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
146 {
147         struct page *page = NULL;
148         struct page *tmp = NULL;
149         int i = 0;
150
151         /* Yes, testing drbd_pp_vacant outside the lock is racy.
152          * So what. It saves a spin_lock. */
153         if (drbd_pp_vacant >= number) {
154                 spin_lock(&drbd_pp_lock);
155                 page = page_chain_del(&drbd_pp_pool, number);
156                 if (page)
157                         drbd_pp_vacant -= number;
158                 spin_unlock(&drbd_pp_lock);
159                 if (page)
160                         return page;
161         }
162
163         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164          * "criss-cross" setup, that might cause write-out on some other DRBD,
165          * which in turn might block on the other node at this very place.  */
166         for (i = 0; i < number; i++) {
167                 tmp = alloc_page(GFP_TRY);
168                 if (!tmp)
169                         break;
170                 set_page_private(tmp, (unsigned long)page);
171                 page = tmp;
172         }
173
174         if (i == number)
175                 return page;
176
177         /* Not enough pages immediately available this time.
178          * No need to jump around here, drbd_pp_alloc will retry this
179          * function "soon". */
180         if (page) {
181                 tmp = page_chain_tail(page, NULL);
182                 spin_lock(&drbd_pp_lock);
183                 page_chain_add(&drbd_pp_pool, page, tmp);
184                 drbd_pp_vacant += i;
185                 spin_unlock(&drbd_pp_lock);
186         }
187         return NULL;
188 }
189
190 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191 {
192         struct drbd_epoch_entry *e;
193         struct list_head *le, *tle;
194
195         /* The EEs are always appended to the end of the list. Since
196            they are sent in order over the wire, they have to finish
197            in order. As soon as we see the first not finished we can
198            stop to examine the list... */
199
200         list_for_each_safe(le, tle, &mdev->net_ee) {
201                 e = list_entry(le, struct drbd_epoch_entry, w.list);
202                 if (drbd_ee_has_active_page(e))
203                         break;
204                 list_move(le, to_be_freed);
205         }
206 }
207
208 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209 {
210         LIST_HEAD(reclaimed);
211         struct drbd_epoch_entry *e, *t;
212
213         spin_lock_irq(&mdev->req_lock);
214         reclaim_net_ee(mdev, &reclaimed);
215         spin_unlock_irq(&mdev->req_lock);
216
217         list_for_each_entry_safe(e, t, &reclaimed, w.list)
218                 drbd_free_net_ee(mdev, e);
219 }
220
221 /**
222  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
223  * @mdev:       DRBD device.
224  * @number:     number of pages requested
225  * @retry:      whether to retry, if not enough pages are available right now
226  *
227  * Tries to allocate number pages, first from our own page pool, then from
228  * the kernel, unless this allocation would exceed the max_buffers setting.
229  * Possibly retry until DRBD frees sufficient pages somewhere else.
230  *
231  * Returns a page chain linked via page->private.
232  */
233 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
234 {
235         struct page *page = NULL;
236         DEFINE_WAIT(wait);
237
238         /* Yes, we may run up to @number over max_buffers. If we
239          * follow it strictly, the admin will get it wrong anyways. */
240         if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
241                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
242
243         while (page == NULL) {
244                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
245
246                 drbd_kick_lo_and_reclaim_net(mdev);
247
248                 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
249                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250                         if (page)
251                                 break;
252                 }
253
254                 if (!retry)
255                         break;
256
257                 if (signal_pending(current)) {
258                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259                         break;
260                 }
261
262                 schedule();
263         }
264         finish_wait(&drbd_pp_wait, &wait);
265
266         if (page)
267                 atomic_add(number, &mdev->pp_in_use);
268         return page;
269 }
270
271 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
272  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
273  * Either links the page chain back to the global pool,
274  * or returns all pages to the system. */
275 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
276 {
277         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
278         int i;
279
280         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
281                 i = page_chain_free(page);
282         else {
283                 struct page *tmp;
284                 tmp = page_chain_tail(page, &i);
285                 spin_lock(&drbd_pp_lock);
286                 page_chain_add(&drbd_pp_pool, page, tmp);
287                 drbd_pp_vacant += i;
288                 spin_unlock(&drbd_pp_lock);
289         }
290         i = atomic_sub_return(i, a);
291         if (i < 0)
292                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
293                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
294         wake_up(&drbd_pp_wait);
295 }
296
297 /*
298 You need to hold the req_lock:
299  _drbd_wait_ee_list_empty()
300
301 You must not have the req_lock:
302  drbd_free_ee()
303  drbd_alloc_ee()
304  drbd_init_ee()
305  drbd_release_ee()
306  drbd_ee_fix_bhs()
307  drbd_process_done_ee()
308  drbd_clear_done_ee()
309  drbd_wait_ee_list_empty()
310 */
311
312 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
313                                      u64 id,
314                                      sector_t sector,
315                                      unsigned int data_size,
316                                      gfp_t gfp_mask) __must_hold(local)
317 {
318         struct drbd_epoch_entry *e;
319         struct page *page;
320         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
321
322         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
323                 return NULL;
324
325         e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
326         if (!e) {
327                 if (!(gfp_mask & __GFP_NOWARN))
328                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
329                 return NULL;
330         }
331
332         page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
333         if (!page)
334                 goto fail;
335
336         drbd_clear_interval(&e->i);
337         e->epoch = NULL;
338         e->mdev = mdev;
339         e->pages = page;
340         atomic_set(&e->pending_bios, 0);
341         e->i.size = data_size;
342         e->flags = 0;
343         e->i.sector = sector;
344         /*
345          * The block_id is opaque to the receiver.  It is not endianness
346          * converted, and sent back to the sender unchanged.
347          */
348         e->block_id = id;
349
350         return e;
351
352  fail:
353         mempool_free(e, drbd_ee_mempool);
354         return NULL;
355 }
356
357 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
358 {
359         if (e->flags & EE_HAS_DIGEST)
360                 kfree(e->digest);
361         drbd_pp_free(mdev, e->pages, is_net);
362         D_ASSERT(atomic_read(&e->pending_bios) == 0);
363         D_ASSERT(drbd_interval_empty(&e->i));
364         mempool_free(e, drbd_ee_mempool);
365 }
366
367 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
368 {
369         LIST_HEAD(work_list);
370         struct drbd_epoch_entry *e, *t;
371         int count = 0;
372         int is_net = list == &mdev->net_ee;
373
374         spin_lock_irq(&mdev->req_lock);
375         list_splice_init(list, &work_list);
376         spin_unlock_irq(&mdev->req_lock);
377
378         list_for_each_entry_safe(e, t, &work_list, w.list) {
379                 drbd_free_some_ee(mdev, e, is_net);
380                 count++;
381         }
382         return count;
383 }
384
385
386 /*
387  * This function is called from _asender only_
388  * but see also comments in _req_mod(,BARRIER_ACKED)
389  * and receive_Barrier.
390  *
391  * Move entries from net_ee to done_ee, if ready.
392  * Grab done_ee, call all callbacks, free the entries.
393  * The callbacks typically send out ACKs.
394  */
395 static int drbd_process_done_ee(struct drbd_conf *mdev)
396 {
397         LIST_HEAD(work_list);
398         LIST_HEAD(reclaimed);
399         struct drbd_epoch_entry *e, *t;
400         int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
401
402         spin_lock_irq(&mdev->req_lock);
403         reclaim_net_ee(mdev, &reclaimed);
404         list_splice_init(&mdev->done_ee, &work_list);
405         spin_unlock_irq(&mdev->req_lock);
406
407         list_for_each_entry_safe(e, t, &reclaimed, w.list)
408                 drbd_free_net_ee(mdev, e);
409
410         /* possible callbacks here:
411          * e_end_block, and e_end_resync_block, e_send_discard_ack.
412          * all ignore the last argument.
413          */
414         list_for_each_entry_safe(e, t, &work_list, w.list) {
415                 /* list_del not necessary, next/prev members not touched */
416                 ok = e->w.cb(mdev, &e->w, !ok) && ok;
417                 drbd_free_ee(mdev, e);
418         }
419         wake_up(&mdev->ee_wait);
420
421         return ok;
422 }
423
424 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
425 {
426         DEFINE_WAIT(wait);
427
428         /* avoids spin_lock/unlock
429          * and calling prepare_to_wait in the fast path */
430         while (!list_empty(head)) {
431                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
432                 spin_unlock_irq(&mdev->req_lock);
433                 io_schedule();
434                 finish_wait(&mdev->ee_wait, &wait);
435                 spin_lock_irq(&mdev->req_lock);
436         }
437 }
438
439 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
440 {
441         spin_lock_irq(&mdev->req_lock);
442         _drbd_wait_ee_list_empty(mdev, head);
443         spin_unlock_irq(&mdev->req_lock);
444 }
445
446 /* see also kernel_accept; which is only present since 2.6.18.
447  * also we want to log which part of it failed, exactly */
448 static int drbd_accept(struct drbd_conf *mdev, const char **what,
449                 struct socket *sock, struct socket **newsock)
450 {
451         struct sock *sk = sock->sk;
452         int err = 0;
453
454         *what = "listen";
455         err = sock->ops->listen(sock, 5);
456         if (err < 0)
457                 goto out;
458
459         *what = "sock_create_lite";
460         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
461                                newsock);
462         if (err < 0)
463                 goto out;
464
465         *what = "accept";
466         err = sock->ops->accept(sock, *newsock, 0);
467         if (err < 0) {
468                 sock_release(*newsock);
469                 *newsock = NULL;
470                 goto out;
471         }
472         (*newsock)->ops  = sock->ops;
473
474 out:
475         return err;
476 }
477
478 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
479                     void *buf, size_t size, int flags)
480 {
481         mm_segment_t oldfs;
482         struct kvec iov = {
483                 .iov_base = buf,
484                 .iov_len = size,
485         };
486         struct msghdr msg = {
487                 .msg_iovlen = 1,
488                 .msg_iov = (struct iovec *)&iov,
489                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
490         };
491         int rv;
492
493         oldfs = get_fs();
494         set_fs(KERNEL_DS);
495         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
496         set_fs(oldfs);
497
498         return rv;
499 }
500
501 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
502 {
503         mm_segment_t oldfs;
504         struct kvec iov = {
505                 .iov_base = buf,
506                 .iov_len = size,
507         };
508         struct msghdr msg = {
509                 .msg_iovlen = 1,
510                 .msg_iov = (struct iovec *)&iov,
511                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
512         };
513         int rv;
514
515         oldfs = get_fs();
516         set_fs(KERNEL_DS);
517
518         for (;;) {
519                 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
520                 if (rv == size)
521                         break;
522
523                 /* Note:
524                  * ECONNRESET   other side closed the connection
525                  * ERESTARTSYS  (on  sock) we got a signal
526                  */
527
528                 if (rv < 0) {
529                         if (rv == -ECONNRESET)
530                                 dev_info(DEV, "sock was reset by peer\n");
531                         else if (rv != -ERESTARTSYS)
532                                 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
533                         break;
534                 } else if (rv == 0) {
535                         dev_info(DEV, "sock was shut down by peer\n");
536                         break;
537                 } else  {
538                         /* signal came in, or peer/link went down,
539                          * after we read a partial message
540                          */
541                         /* D_ASSERT(signal_pending(current)); */
542                         break;
543                 }
544         };
545
546         set_fs(oldfs);
547
548         if (rv != size)
549                 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
550
551         return rv;
552 }
553
554 /* quoting tcp(7):
555  *   On individual connections, the socket buffer size must be set prior to the
556  *   listen(2) or connect(2) calls in order to have it take effect.
557  * This is our wrapper to do so.
558  */
559 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
560                 unsigned int rcv)
561 {
562         /* open coded SO_SNDBUF, SO_RCVBUF */
563         if (snd) {
564                 sock->sk->sk_sndbuf = snd;
565                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
566         }
567         if (rcv) {
568                 sock->sk->sk_rcvbuf = rcv;
569                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
570         }
571 }
572
573 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
574 {
575         const char *what;
576         struct socket *sock;
577         struct sockaddr_in6 src_in6;
578         int err;
579         int disconnect_on_error = 1;
580
581         if (!get_net_conf(mdev))
582                 return NULL;
583
584         what = "sock_create_kern";
585         err = sock_create_kern(((struct sockaddr *)mdev->tconn->net_conf->my_addr)->sa_family,
586                 SOCK_STREAM, IPPROTO_TCP, &sock);
587         if (err < 0) {
588                 sock = NULL;
589                 goto out;
590         }
591
592         sock->sk->sk_rcvtimeo =
593         sock->sk->sk_sndtimeo =  mdev->tconn->net_conf->try_connect_int*HZ;
594         drbd_setbufsize(sock, mdev->tconn->net_conf->sndbuf_size,
595                         mdev->tconn->net_conf->rcvbuf_size);
596
597        /* explicitly bind to the configured IP as source IP
598         *  for the outgoing connections.
599         *  This is needed for multihomed hosts and to be
600         *  able to use lo: interfaces for drbd.
601         * Make sure to use 0 as port number, so linux selects
602         *  a free one dynamically.
603         */
604         memcpy(&src_in6, mdev->tconn->net_conf->my_addr,
605                min_t(int, mdev->tconn->net_conf->my_addr_len, sizeof(src_in6)));
606         if (((struct sockaddr *)mdev->tconn->net_conf->my_addr)->sa_family == AF_INET6)
607                 src_in6.sin6_port = 0;
608         else
609                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
610
611         what = "bind before connect";
612         err = sock->ops->bind(sock,
613                               (struct sockaddr *) &src_in6,
614                               mdev->tconn->net_conf->my_addr_len);
615         if (err < 0)
616                 goto out;
617
618         /* connect may fail, peer not yet available.
619          * stay C_WF_CONNECTION, don't go Disconnecting! */
620         disconnect_on_error = 0;
621         what = "connect";
622         err = sock->ops->connect(sock,
623                                  (struct sockaddr *)mdev->tconn->net_conf->peer_addr,
624                                  mdev->tconn->net_conf->peer_addr_len, 0);
625
626 out:
627         if (err < 0) {
628                 if (sock) {
629                         sock_release(sock);
630                         sock = NULL;
631                 }
632                 switch (-err) {
633                         /* timeout, busy, signal pending */
634                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
635                 case EINTR: case ERESTARTSYS:
636                         /* peer not (yet) available, network problem */
637                 case ECONNREFUSED: case ENETUNREACH:
638                 case EHOSTDOWN:    case EHOSTUNREACH:
639                         disconnect_on_error = 0;
640                         break;
641                 default:
642                         dev_err(DEV, "%s failed, err = %d\n", what, err);
643                 }
644                 if (disconnect_on_error)
645                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
646         }
647         put_net_conf(mdev);
648         return sock;
649 }
650
651 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
652 {
653         int timeo, err;
654         struct socket *s_estab = NULL, *s_listen;
655         const char *what;
656
657         if (!get_net_conf(mdev))
658                 return NULL;
659
660         what = "sock_create_kern";
661         err = sock_create_kern(((struct sockaddr *)mdev->tconn->net_conf->my_addr)->sa_family,
662                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
663         if (err) {
664                 s_listen = NULL;
665                 goto out;
666         }
667
668         timeo = mdev->tconn->net_conf->try_connect_int * HZ;
669         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
670
671         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
672         s_listen->sk->sk_rcvtimeo = timeo;
673         s_listen->sk->sk_sndtimeo = timeo;
674         drbd_setbufsize(s_listen, mdev->tconn->net_conf->sndbuf_size,
675                         mdev->tconn->net_conf->rcvbuf_size);
676
677         what = "bind before listen";
678         err = s_listen->ops->bind(s_listen,
679                               (struct sockaddr *) mdev->tconn->net_conf->my_addr,
680                               mdev->tconn->net_conf->my_addr_len);
681         if (err < 0)
682                 goto out;
683
684         err = drbd_accept(mdev, &what, s_listen, &s_estab);
685
686 out:
687         if (s_listen)
688                 sock_release(s_listen);
689         if (err < 0) {
690                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
691                         dev_err(DEV, "%s failed, err = %d\n", what, err);
692                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
693                 }
694         }
695         put_net_conf(mdev);
696
697         return s_estab;
698 }
699
700 static int drbd_send_fp(struct drbd_conf *mdev,
701         struct socket *sock, enum drbd_packets cmd)
702 {
703         struct p_header80 *h = &mdev->data.sbuf.header.h80;
704
705         return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
706 }
707
708 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
709 {
710         struct p_header80 *h = &mdev->data.rbuf.header.h80;
711         int rr;
712
713         rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
714
715         if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
716                 return be16_to_cpu(h->command);
717
718         return 0xffff;
719 }
720
721 /**
722  * drbd_socket_okay() - Free the socket if its connection is not okay
723  * @mdev:       DRBD device.
724  * @sock:       pointer to the pointer to the socket.
725  */
726 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
727 {
728         int rr;
729         char tb[4];
730
731         if (!*sock)
732                 return false;
733
734         rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
735
736         if (rr > 0 || rr == -EAGAIN) {
737                 return true;
738         } else {
739                 sock_release(*sock);
740                 *sock = NULL;
741                 return false;
742         }
743 }
744
745 /*
746  * return values:
747  *   1 yes, we have a valid connection
748  *   0 oops, did not work out, please try again
749  *  -1 peer talks different language,
750  *     no point in trying again, please go standalone.
751  *  -2 We do not have a network config...
752  */
753 static int drbd_connect(struct drbd_conf *mdev)
754 {
755         struct socket *s, *sock, *msock;
756         int try, h, ok;
757
758         D_ASSERT(!mdev->data.socket);
759
760         if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
761                 return -2;
762
763         clear_bit(DISCARD_CONCURRENT, &mdev->flags);
764
765         sock  = NULL;
766         msock = NULL;
767
768         do {
769                 for (try = 0;;) {
770                         /* 3 tries, this should take less than a second! */
771                         s = drbd_try_connect(mdev);
772                         if (s || ++try >= 3)
773                                 break;
774                         /* give the other side time to call bind() & listen() */
775                         schedule_timeout_interruptible(HZ / 10);
776                 }
777
778                 if (s) {
779                         if (!sock) {
780                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
781                                 sock = s;
782                                 s = NULL;
783                         } else if (!msock) {
784                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
785                                 msock = s;
786                                 s = NULL;
787                         } else {
788                                 dev_err(DEV, "Logic error in drbd_connect()\n");
789                                 goto out_release_sockets;
790                         }
791                 }
792
793                 if (sock && msock) {
794                         schedule_timeout_interruptible(mdev->tconn->net_conf->ping_timeo*HZ/10);
795                         ok = drbd_socket_okay(mdev, &sock);
796                         ok = drbd_socket_okay(mdev, &msock) && ok;
797                         if (ok)
798                                 break;
799                 }
800
801 retry:
802                 s = drbd_wait_for_connect(mdev);
803                 if (s) {
804                         try = drbd_recv_fp(mdev, s);
805                         drbd_socket_okay(mdev, &sock);
806                         drbd_socket_okay(mdev, &msock);
807                         switch (try) {
808                         case P_HAND_SHAKE_S:
809                                 if (sock) {
810                                         dev_warn(DEV, "initial packet S crossed\n");
811                                         sock_release(sock);
812                                 }
813                                 sock = s;
814                                 break;
815                         case P_HAND_SHAKE_M:
816                                 if (msock) {
817                                         dev_warn(DEV, "initial packet M crossed\n");
818                                         sock_release(msock);
819                                 }
820                                 msock = s;
821                                 set_bit(DISCARD_CONCURRENT, &mdev->flags);
822                                 break;
823                         default:
824                                 dev_warn(DEV, "Error receiving initial packet\n");
825                                 sock_release(s);
826                                 if (random32() & 1)
827                                         goto retry;
828                         }
829                 }
830
831                 if (mdev->state.conn <= C_DISCONNECTING)
832                         goto out_release_sockets;
833                 if (signal_pending(current)) {
834                         flush_signals(current);
835                         smp_rmb();
836                         if (get_t_state(&mdev->receiver) == EXITING)
837                                 goto out_release_sockets;
838                 }
839
840                 if (sock && msock) {
841                         ok = drbd_socket_okay(mdev, &sock);
842                         ok = drbd_socket_okay(mdev, &msock) && ok;
843                         if (ok)
844                                 break;
845                 }
846         } while (1);
847
848         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
849         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
850
851         sock->sk->sk_allocation = GFP_NOIO;
852         msock->sk->sk_allocation = GFP_NOIO;
853
854         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
855         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
856
857         /* NOT YET ...
858          * sock->sk->sk_sndtimeo = mdev->tconn->net_conf->timeout*HZ/10;
859          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
860          * first set it to the P_HAND_SHAKE timeout,
861          * which we set to 4x the configured ping_timeout. */
862         sock->sk->sk_sndtimeo =
863         sock->sk->sk_rcvtimeo = mdev->tconn->net_conf->ping_timeo*4*HZ/10;
864
865         msock->sk->sk_sndtimeo = mdev->tconn->net_conf->timeout*HZ/10;
866         msock->sk->sk_rcvtimeo = mdev->tconn->net_conf->ping_int*HZ;
867
868         /* we don't want delays.
869          * we use TCP_CORK where appropriate, though */
870         drbd_tcp_nodelay(sock);
871         drbd_tcp_nodelay(msock);
872
873         mdev->data.socket = sock;
874         mdev->meta.socket = msock;
875         mdev->last_received = jiffies;
876
877         D_ASSERT(mdev->asender.task == NULL);
878
879         h = drbd_do_handshake(mdev);
880         if (h <= 0)
881                 return h;
882
883         if (mdev->cram_hmac_tfm) {
884                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
885                 switch (drbd_do_auth(mdev)) {
886                 case -1:
887                         dev_err(DEV, "Authentication of peer failed\n");
888                         return -1;
889                 case 0:
890                         dev_err(DEV, "Authentication of peer failed, trying again.\n");
891                         return 0;
892                 }
893         }
894
895         if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
896                 return 0;
897
898         sock->sk->sk_sndtimeo = mdev->tconn->net_conf->timeout*HZ/10;
899         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
900
901         atomic_set(&mdev->packet_seq, 0);
902         mdev->peer_seq = 0;
903
904         drbd_thread_start(&mdev->asender);
905
906         if (drbd_send_protocol(mdev) == -1)
907                 return -1;
908         drbd_send_sync_param(mdev, &mdev->sync_conf);
909         drbd_send_sizes(mdev, 0, 0);
910         drbd_send_uuids(mdev);
911         drbd_send_state(mdev);
912         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
913         clear_bit(RESIZE_PENDING, &mdev->flags);
914         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
915
916         return 1;
917
918 out_release_sockets:
919         if (sock)
920                 sock_release(sock);
921         if (msock)
922                 sock_release(msock);
923         return -1;
924 }
925
926 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
927 {
928         union p_header *h = &mdev->data.rbuf.header;
929         int r;
930
931         r = drbd_recv(mdev, h, sizeof(*h));
932         if (unlikely(r != sizeof(*h))) {
933                 if (!signal_pending(current))
934                         dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
935                 return false;
936         }
937
938         if (likely(h->h80.magic == cpu_to_be32(DRBD_MAGIC))) {
939                 *cmd = be16_to_cpu(h->h80.command);
940                 *packet_size = be16_to_cpu(h->h80.length);
941         } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
942                 *cmd = be16_to_cpu(h->h95.command);
943                 *packet_size = be32_to_cpu(h->h95.length);
944         } else {
945                 dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
946                     be32_to_cpu(h->h80.magic),
947                     be16_to_cpu(h->h80.command),
948                     be16_to_cpu(h->h80.length));
949                 return false;
950         }
951         mdev->last_received = jiffies;
952
953         return true;
954 }
955
956 static void drbd_flush(struct drbd_conf *mdev)
957 {
958         int rv;
959
960         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
961                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
962                                         NULL);
963                 if (rv) {
964                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
965                         /* would rather check on EOPNOTSUPP, but that is not reliable.
966                          * don't try again for ANY return value != 0
967                          * if (rv == -EOPNOTSUPP) */
968                         drbd_bump_write_ordering(mdev, WO_drain_io);
969                 }
970                 put_ldev(mdev);
971         }
972 }
973
974 /**
975  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
976  * @mdev:       DRBD device.
977  * @epoch:      Epoch object.
978  * @ev:         Epoch event.
979  */
980 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
981                                                struct drbd_epoch *epoch,
982                                                enum epoch_event ev)
983 {
984         int epoch_size;
985         struct drbd_epoch *next_epoch;
986         enum finish_epoch rv = FE_STILL_LIVE;
987
988         spin_lock(&mdev->epoch_lock);
989         do {
990                 next_epoch = NULL;
991
992                 epoch_size = atomic_read(&epoch->epoch_size);
993
994                 switch (ev & ~EV_CLEANUP) {
995                 case EV_PUT:
996                         atomic_dec(&epoch->active);
997                         break;
998                 case EV_GOT_BARRIER_NR:
999                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1000                         break;
1001                 case EV_BECAME_LAST:
1002                         /* nothing to do*/
1003                         break;
1004                 }
1005
1006                 if (epoch_size != 0 &&
1007                     atomic_read(&epoch->active) == 0 &&
1008                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1009                         if (!(ev & EV_CLEANUP)) {
1010                                 spin_unlock(&mdev->epoch_lock);
1011                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1012                                 spin_lock(&mdev->epoch_lock);
1013                         }
1014                         dec_unacked(mdev);
1015
1016                         if (mdev->current_epoch != epoch) {
1017                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1018                                 list_del(&epoch->list);
1019                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1020                                 mdev->epochs--;
1021                                 kfree(epoch);
1022
1023                                 if (rv == FE_STILL_LIVE)
1024                                         rv = FE_DESTROYED;
1025                         } else {
1026                                 epoch->flags = 0;
1027                                 atomic_set(&epoch->epoch_size, 0);
1028                                 /* atomic_set(&epoch->active, 0); is already zero */
1029                                 if (rv == FE_STILL_LIVE)
1030                                         rv = FE_RECYCLED;
1031                                 wake_up(&mdev->ee_wait);
1032                         }
1033                 }
1034
1035                 if (!next_epoch)
1036                         break;
1037
1038                 epoch = next_epoch;
1039         } while (1);
1040
1041         spin_unlock(&mdev->epoch_lock);
1042
1043         return rv;
1044 }
1045
1046 /**
1047  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1048  * @mdev:       DRBD device.
1049  * @wo:         Write ordering method to try.
1050  */
1051 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1052 {
1053         enum write_ordering_e pwo;
1054         static char *write_ordering_str[] = {
1055                 [WO_none] = "none",
1056                 [WO_drain_io] = "drain",
1057                 [WO_bdev_flush] = "flush",
1058         };
1059
1060         pwo = mdev->write_ordering;
1061         wo = min(pwo, wo);
1062         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1063                 wo = WO_drain_io;
1064         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1065                 wo = WO_none;
1066         mdev->write_ordering = wo;
1067         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1068                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1069 }
1070
1071 /**
1072  * drbd_submit_ee()
1073  * @mdev:       DRBD device.
1074  * @e:          epoch entry
1075  * @rw:         flag field, see bio->bi_rw
1076  *
1077  * May spread the pages to multiple bios,
1078  * depending on bio_add_page restrictions.
1079  *
1080  * Returns 0 if all bios have been submitted,
1081  * -ENOMEM if we could not allocate enough bios,
1082  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1083  *  single page to an empty bio (which should never happen and likely indicates
1084  *  that the lower level IO stack is in some way broken). This has been observed
1085  *  on certain Xen deployments.
1086  */
1087 /* TODO allocate from our own bio_set. */
1088 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1089                 const unsigned rw, const int fault_type)
1090 {
1091         struct bio *bios = NULL;
1092         struct bio *bio;
1093         struct page *page = e->pages;
1094         sector_t sector = e->i.sector;
1095         unsigned ds = e->i.size;
1096         unsigned n_bios = 0;
1097         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1098         int err = -ENOMEM;
1099
1100         /* In most cases, we will only need one bio.  But in case the lower
1101          * level restrictions happen to be different at this offset on this
1102          * side than those of the sending peer, we may need to submit the
1103          * request in more than one bio. */
1104 next_bio:
1105         bio = bio_alloc(GFP_NOIO, nr_pages);
1106         if (!bio) {
1107                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1108                 goto fail;
1109         }
1110         /* > e->i.sector, unless this is the first bio */
1111         bio->bi_sector = sector;
1112         bio->bi_bdev = mdev->ldev->backing_bdev;
1113         bio->bi_rw = rw;
1114         bio->bi_private = e;
1115         bio->bi_end_io = drbd_endio_sec;
1116
1117         bio->bi_next = bios;
1118         bios = bio;
1119         ++n_bios;
1120
1121         page_chain_for_each(page) {
1122                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1123                 if (!bio_add_page(bio, page, len, 0)) {
1124                         /* A single page must always be possible!
1125                          * But in case it fails anyways,
1126                          * we deal with it, and complain (below). */
1127                         if (bio->bi_vcnt == 0) {
1128                                 dev_err(DEV,
1129                                         "bio_add_page failed for len=%u, "
1130                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1131                                         len, (unsigned long long)bio->bi_sector);
1132                                 err = -ENOSPC;
1133                                 goto fail;
1134                         }
1135                         goto next_bio;
1136                 }
1137                 ds -= len;
1138                 sector += len >> 9;
1139                 --nr_pages;
1140         }
1141         D_ASSERT(page == NULL);
1142         D_ASSERT(ds == 0);
1143
1144         atomic_set(&e->pending_bios, n_bios);
1145         do {
1146                 bio = bios;
1147                 bios = bios->bi_next;
1148                 bio->bi_next = NULL;
1149
1150                 drbd_generic_make_request(mdev, fault_type, bio);
1151         } while (bios);
1152         return 0;
1153
1154 fail:
1155         while (bios) {
1156                 bio = bios;
1157                 bios = bios->bi_next;
1158                 bio_put(bio);
1159         }
1160         return err;
1161 }
1162
1163 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1164 {
1165         int rv;
1166         struct p_barrier *p = &mdev->data.rbuf.barrier;
1167         struct drbd_epoch *epoch;
1168
1169         inc_unacked(mdev);
1170
1171         mdev->current_epoch->barrier_nr = p->barrier;
1172         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1173
1174         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1175          * the activity log, which means it would not be resynced in case the
1176          * R_PRIMARY crashes now.
1177          * Therefore we must send the barrier_ack after the barrier request was
1178          * completed. */
1179         switch (mdev->write_ordering) {
1180         case WO_none:
1181                 if (rv == FE_RECYCLED)
1182                         return true;
1183
1184                 /* receiver context, in the writeout path of the other node.
1185                  * avoid potential distributed deadlock */
1186                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1187                 if (epoch)
1188                         break;
1189                 else
1190                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1191                         /* Fall through */
1192
1193         case WO_bdev_flush:
1194         case WO_drain_io:
1195                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1196                 drbd_flush(mdev);
1197
1198                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1199                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1200                         if (epoch)
1201                                 break;
1202                 }
1203
1204                 epoch = mdev->current_epoch;
1205                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1206
1207                 D_ASSERT(atomic_read(&epoch->active) == 0);
1208                 D_ASSERT(epoch->flags == 0);
1209
1210                 return true;
1211         default:
1212                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1213                 return false;
1214         }
1215
1216         epoch->flags = 0;
1217         atomic_set(&epoch->epoch_size, 0);
1218         atomic_set(&epoch->active, 0);
1219
1220         spin_lock(&mdev->epoch_lock);
1221         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1222                 list_add(&epoch->list, &mdev->current_epoch->list);
1223                 mdev->current_epoch = epoch;
1224                 mdev->epochs++;
1225         } else {
1226                 /* The current_epoch got recycled while we allocated this one... */
1227                 kfree(epoch);
1228         }
1229         spin_unlock(&mdev->epoch_lock);
1230
1231         return true;
1232 }
1233
1234 /* used from receive_RSDataReply (recv_resync_read)
1235  * and from receive_Data */
1236 static struct drbd_epoch_entry *
1237 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1238 {
1239         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1240         struct drbd_epoch_entry *e;
1241         struct page *page;
1242         int dgs, ds, rr;
1243         void *dig_in = mdev->int_dig_in;
1244         void *dig_vv = mdev->int_dig_vv;
1245         unsigned long *data;
1246
1247         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1248                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1249
1250         if (dgs) {
1251                 rr = drbd_recv(mdev, dig_in, dgs);
1252                 if (rr != dgs) {
1253                         if (!signal_pending(current))
1254                                 dev_warn(DEV,
1255                                         "short read receiving data digest: read %d expected %d\n",
1256                                         rr, dgs);
1257                         return NULL;
1258                 }
1259         }
1260
1261         data_size -= dgs;
1262
1263         if (!expect(data_size != 0))
1264                 return NULL;
1265         if (!expect(IS_ALIGNED(data_size, 512)))
1266                 return NULL;
1267         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1268                 return NULL;
1269
1270         /* even though we trust out peer,
1271          * we sometimes have to double check. */
1272         if (sector + (data_size>>9) > capacity) {
1273                 dev_err(DEV, "request from peer beyond end of local disk: "
1274                         "capacity: %llus < sector: %llus + size: %u\n",
1275                         (unsigned long long)capacity,
1276                         (unsigned long long)sector, data_size);
1277                 return NULL;
1278         }
1279
1280         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1281          * "criss-cross" setup, that might cause write-out on some other DRBD,
1282          * which in turn might block on the other node at this very place.  */
1283         e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1284         if (!e)
1285                 return NULL;
1286
1287         ds = data_size;
1288         page = e->pages;
1289         page_chain_for_each(page) {
1290                 unsigned len = min_t(int, ds, PAGE_SIZE);
1291                 data = kmap(page);
1292                 rr = drbd_recv(mdev, data, len);
1293                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1294                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1295                         data[0] = data[0] ^ (unsigned long)-1;
1296                 }
1297                 kunmap(page);
1298                 if (rr != len) {
1299                         drbd_free_ee(mdev, e);
1300                         if (!signal_pending(current))
1301                                 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1302                                 rr, len);
1303                         return NULL;
1304                 }
1305                 ds -= rr;
1306         }
1307
1308         if (dgs) {
1309                 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1310                 if (memcmp(dig_in, dig_vv, dgs)) {
1311                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1312                                 (unsigned long long)sector, data_size);
1313                         drbd_bcast_ee(mdev, "digest failed",
1314                                         dgs, dig_in, dig_vv, e);
1315                         drbd_free_ee(mdev, e);
1316                         return NULL;
1317                 }
1318         }
1319         mdev->recv_cnt += data_size>>9;
1320         return e;
1321 }
1322
1323 /* drbd_drain_block() just takes a data block
1324  * out of the socket input buffer, and discards it.
1325  */
1326 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1327 {
1328         struct page *page;
1329         int rr, rv = 1;
1330         void *data;
1331
1332         if (!data_size)
1333                 return true;
1334
1335         page = drbd_pp_alloc(mdev, 1, 1);
1336
1337         data = kmap(page);
1338         while (data_size) {
1339                 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1340                 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1341                         rv = 0;
1342                         if (!signal_pending(current))
1343                                 dev_warn(DEV,
1344                                         "short read receiving data: read %d expected %d\n",
1345                                         rr, min_t(int, data_size, PAGE_SIZE));
1346                         break;
1347                 }
1348                 data_size -= rr;
1349         }
1350         kunmap(page);
1351         drbd_pp_free(mdev, page, 0);
1352         return rv;
1353 }
1354
1355 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1356                            sector_t sector, int data_size)
1357 {
1358         struct bio_vec *bvec;
1359         struct bio *bio;
1360         int dgs, rr, i, expect;
1361         void *dig_in = mdev->int_dig_in;
1362         void *dig_vv = mdev->int_dig_vv;
1363
1364         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1365                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1366
1367         if (dgs) {
1368                 rr = drbd_recv(mdev, dig_in, dgs);
1369                 if (rr != dgs) {
1370                         if (!signal_pending(current))
1371                                 dev_warn(DEV,
1372                                         "short read receiving data reply digest: read %d expected %d\n",
1373                                         rr, dgs);
1374                         return 0;
1375                 }
1376         }
1377
1378         data_size -= dgs;
1379
1380         /* optimistically update recv_cnt.  if receiving fails below,
1381          * we disconnect anyways, and counters will be reset. */
1382         mdev->recv_cnt += data_size>>9;
1383
1384         bio = req->master_bio;
1385         D_ASSERT(sector == bio->bi_sector);
1386
1387         bio_for_each_segment(bvec, bio, i) {
1388                 expect = min_t(int, data_size, bvec->bv_len);
1389                 rr = drbd_recv(mdev,
1390                              kmap(bvec->bv_page)+bvec->bv_offset,
1391                              expect);
1392                 kunmap(bvec->bv_page);
1393                 if (rr != expect) {
1394                         if (!signal_pending(current))
1395                                 dev_warn(DEV, "short read receiving data reply: "
1396                                         "read %d expected %d\n",
1397                                         rr, expect);
1398                         return 0;
1399                 }
1400                 data_size -= rr;
1401         }
1402
1403         if (dgs) {
1404                 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1405                 if (memcmp(dig_in, dig_vv, dgs)) {
1406                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1407                         return 0;
1408                 }
1409         }
1410
1411         D_ASSERT(data_size == 0);
1412         return 1;
1413 }
1414
1415 /* e_end_resync_block() is called via
1416  * drbd_process_done_ee() by asender only */
1417 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1418 {
1419         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1420         sector_t sector = e->i.sector;
1421         int ok;
1422
1423         D_ASSERT(drbd_interval_empty(&e->i));
1424
1425         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1426                 drbd_set_in_sync(mdev, sector, e->i.size);
1427                 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1428         } else {
1429                 /* Record failure to sync */
1430                 drbd_rs_failed_io(mdev, sector, e->i.size);
1431
1432                 ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1433         }
1434         dec_unacked(mdev);
1435
1436         return ok;
1437 }
1438
1439 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1440 {
1441         struct drbd_epoch_entry *e;
1442
1443         e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1444         if (!e)
1445                 goto fail;
1446
1447         dec_rs_pending(mdev);
1448
1449         inc_unacked(mdev);
1450         /* corresponding dec_unacked() in e_end_resync_block()
1451          * respective _drbd_clear_done_ee */
1452
1453         e->w.cb = e_end_resync_block;
1454
1455         spin_lock_irq(&mdev->req_lock);
1456         list_add(&e->w.list, &mdev->sync_ee);
1457         spin_unlock_irq(&mdev->req_lock);
1458
1459         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1460         if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1461                 return true;
1462
1463         /* don't care for the reason here */
1464         dev_err(DEV, "submit failed, triggering re-connect\n");
1465         spin_lock_irq(&mdev->req_lock);
1466         list_del(&e->w.list);
1467         spin_unlock_irq(&mdev->req_lock);
1468
1469         drbd_free_ee(mdev, e);
1470 fail:
1471         put_ldev(mdev);
1472         return false;
1473 }
1474
1475 static struct drbd_request *
1476 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1477              sector_t sector, bool missing_ok, const char *func)
1478 {
1479         struct drbd_request *req;
1480
1481         /* Request object according to our peer */
1482         req = (struct drbd_request *)(unsigned long)id;
1483         if (drbd_contains_interval(root, sector, &req->i))
1484                 return req;
1485         if (!missing_ok) {
1486                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1487                         (unsigned long)id, (unsigned long long)sector);
1488         }
1489         return NULL;
1490 }
1491
1492 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1493 {
1494         struct drbd_request *req;
1495         sector_t sector;
1496         int ok;
1497         struct p_data *p = &mdev->data.rbuf.data;
1498
1499         sector = be64_to_cpu(p->sector);
1500
1501         spin_lock_irq(&mdev->req_lock);
1502         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1503         spin_unlock_irq(&mdev->req_lock);
1504         if (unlikely(!req))
1505                 return false;
1506
1507         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1508          * special casing it there for the various failure cases.
1509          * still no race with drbd_fail_pending_reads */
1510         ok = recv_dless_read(mdev, req, sector, data_size);
1511
1512         if (ok)
1513                 req_mod(req, DATA_RECEIVED);
1514         /* else: nothing. handled from drbd_disconnect...
1515          * I don't think we may complete this just yet
1516          * in case we are "on-disconnect: freeze" */
1517
1518         return ok;
1519 }
1520
1521 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1522 {
1523         sector_t sector;
1524         int ok;
1525         struct p_data *p = &mdev->data.rbuf.data;
1526
1527         sector = be64_to_cpu(p->sector);
1528         D_ASSERT(p->block_id == ID_SYNCER);
1529
1530         if (get_ldev(mdev)) {
1531                 /* data is submitted to disk within recv_resync_read.
1532                  * corresponding put_ldev done below on error,
1533                  * or in drbd_endio_sec. */
1534                 ok = recv_resync_read(mdev, sector, data_size);
1535         } else {
1536                 if (__ratelimit(&drbd_ratelimit_state))
1537                         dev_err(DEV, "Can not write resync data to local disk.\n");
1538
1539                 ok = drbd_drain_block(mdev, data_size);
1540
1541                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1542         }
1543
1544         atomic_add(data_size >> 9, &mdev->rs_sect_in);
1545
1546         return ok;
1547 }
1548
1549 /* e_end_block() is called via drbd_process_done_ee().
1550  * this means this function only runs in the asender thread
1551  */
1552 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1553 {
1554         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1555         sector_t sector = e->i.sector;
1556         int ok = 1, pcmd;
1557
1558         if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
1559                 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1560                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1561                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1562                                 e->flags & EE_MAY_SET_IN_SYNC) ?
1563                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1564                         ok &= drbd_send_ack(mdev, pcmd, e);
1565                         if (pcmd == P_RS_WRITE_ACK)
1566                                 drbd_set_in_sync(mdev, sector, e->i.size);
1567                 } else {
1568                         ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1569                         /* we expect it to be marked out of sync anyways...
1570                          * maybe assert this?  */
1571                 }
1572                 dec_unacked(mdev);
1573         }
1574         /* we delete from the conflict detection hash _after_ we sent out the
1575          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1576         if (mdev->tconn->net_conf->two_primaries) {
1577                 spin_lock_irq(&mdev->req_lock);
1578                 D_ASSERT(!drbd_interval_empty(&e->i));
1579                 drbd_remove_interval(&mdev->epoch_entries, &e->i);
1580                 drbd_clear_interval(&e->i);
1581                 spin_unlock_irq(&mdev->req_lock);
1582         } else
1583                 D_ASSERT(drbd_interval_empty(&e->i));
1584
1585         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1586
1587         return ok;
1588 }
1589
1590 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1591 {
1592         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1593         int ok = 1;
1594
1595         D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
1596         ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1597
1598         spin_lock_irq(&mdev->req_lock);
1599         D_ASSERT(!drbd_interval_empty(&e->i));
1600         drbd_remove_interval(&mdev->epoch_entries, &e->i);
1601         drbd_clear_interval(&e->i);
1602         spin_unlock_irq(&mdev->req_lock);
1603
1604         dec_unacked(mdev);
1605
1606         return ok;
1607 }
1608
1609 /* Called from receive_Data.
1610  * Synchronize packets on sock with packets on msock.
1611  *
1612  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1613  * packet traveling on msock, they are still processed in the order they have
1614  * been sent.
1615  *
1616  * Note: we don't care for Ack packets overtaking P_DATA packets.
1617  *
1618  * In case packet_seq is larger than mdev->peer_seq number, there are
1619  * outstanding packets on the msock. We wait for them to arrive.
1620  * In case we are the logically next packet, we update mdev->peer_seq
1621  * ourselves. Correctly handles 32bit wrap around.
1622  *
1623  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1624  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1625  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1626  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1627  *
1628  * returns 0 if we may process the packet,
1629  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1630 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1631 {
1632         DEFINE_WAIT(wait);
1633         unsigned int p_seq;
1634         long timeout;
1635         int ret = 0;
1636         spin_lock(&mdev->peer_seq_lock);
1637         for (;;) {
1638                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1639                 if (seq_le(packet_seq, mdev->peer_seq+1))
1640                         break;
1641                 if (signal_pending(current)) {
1642                         ret = -ERESTARTSYS;
1643                         break;
1644                 }
1645                 p_seq = mdev->peer_seq;
1646                 spin_unlock(&mdev->peer_seq_lock);
1647                 timeout = schedule_timeout(30*HZ);
1648                 spin_lock(&mdev->peer_seq_lock);
1649                 if (timeout == 0 && p_seq == mdev->peer_seq) {
1650                         ret = -ETIMEDOUT;
1651                         dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1652                         break;
1653                 }
1654         }
1655         finish_wait(&mdev->seq_wait, &wait);
1656         if (mdev->peer_seq+1 == packet_seq)
1657                 mdev->peer_seq++;
1658         spin_unlock(&mdev->peer_seq_lock);
1659         return ret;
1660 }
1661
1662 /* see also bio_flags_to_wire()
1663  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1664  * flags and back. We may replicate to other kernel versions. */
1665 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1666 {
1667         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1668                 (dpf & DP_FUA ? REQ_FUA : 0) |
1669                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1670                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1671 }
1672
1673 /* mirrored write */
1674 static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1675 {
1676         sector_t sector;
1677         struct drbd_epoch_entry *e;
1678         struct p_data *p = &mdev->data.rbuf.data;
1679         int rw = WRITE;
1680         u32 dp_flags;
1681
1682         if (!get_ldev(mdev)) {
1683                 spin_lock(&mdev->peer_seq_lock);
1684                 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1685                         mdev->peer_seq++;
1686                 spin_unlock(&mdev->peer_seq_lock);
1687
1688                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1689                 atomic_inc(&mdev->current_epoch->epoch_size);
1690                 return drbd_drain_block(mdev, data_size);
1691         }
1692
1693         /* get_ldev(mdev) successful.
1694          * Corresponding put_ldev done either below (on various errors),
1695          * or in drbd_endio_sec, if we successfully submit the data at
1696          * the end of this function. */
1697
1698         sector = be64_to_cpu(p->sector);
1699         e = read_in_block(mdev, p->block_id, sector, data_size);
1700         if (!e) {
1701                 put_ldev(mdev);
1702                 return false;
1703         }
1704
1705         e->w.cb = e_end_block;
1706
1707         dp_flags = be32_to_cpu(p->dp_flags);
1708         rw |= wire_flags_to_bio(mdev, dp_flags);
1709
1710         if (dp_flags & DP_MAY_SET_IN_SYNC)
1711                 e->flags |= EE_MAY_SET_IN_SYNC;
1712
1713         spin_lock(&mdev->epoch_lock);
1714         e->epoch = mdev->current_epoch;
1715         atomic_inc(&e->epoch->epoch_size);
1716         atomic_inc(&e->epoch->active);
1717         spin_unlock(&mdev->epoch_lock);
1718
1719         /* I'm the receiver, I do hold a net_cnt reference. */
1720         if (!mdev->tconn->net_conf->two_primaries) {
1721                 spin_lock_irq(&mdev->req_lock);
1722         } else {
1723                 /* don't get the req_lock yet,
1724                  * we may sleep in drbd_wait_peer_seq */
1725                 const int size = e->i.size;
1726                 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1727                 DEFINE_WAIT(wait);
1728                 int first;
1729
1730                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
1731
1732                 /* conflict detection and handling:
1733                  * 1. wait on the sequence number,
1734                  *    in case this data packet overtook ACK packets.
1735                  * 2. check our interval trees for conflicting requests:
1736                  *    we only need to check the write_requests tree; the
1737                  *    epoch_entries tree cannot contain any overlaps because
1738                  *    they were already eliminated on the submitting node.
1739                  *
1740                  * Note: for two_primaries, we are protocol C,
1741                  * so there cannot be any request that is DONE
1742                  * but still on the transfer log.
1743                  *
1744                  * unconditionally add to the epoch_entries tree.
1745                  *
1746                  * if no conflicting request is found:
1747                  *    submit.
1748                  *
1749                  * if any conflicting request is found
1750                  * that has not yet been acked,
1751                  * AND I have the "discard concurrent writes" flag:
1752                  *       queue (via done_ee) the P_DISCARD_ACK; OUT.
1753                  *
1754                  * if any conflicting request is found:
1755                  *       block the receiver, waiting on misc_wait
1756                  *       until no more conflicting requests are there,
1757                  *       or we get interrupted (disconnect).
1758                  *
1759                  *       we do not just write after local io completion of those
1760                  *       requests, but only after req is done completely, i.e.
1761                  *       we wait for the P_DISCARD_ACK to arrive!
1762                  *
1763                  *       then proceed normally, i.e. submit.
1764                  */
1765                 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1766                         goto out_interrupted;
1767
1768                 spin_lock_irq(&mdev->req_lock);
1769
1770                 drbd_insert_interval(&mdev->epoch_entries, &e->i);
1771
1772                 first = 1;
1773                 for (;;) {
1774                         struct drbd_interval *i;
1775                         int have_unacked = 0;
1776                         int have_conflict = 0;
1777                         prepare_to_wait(&mdev->misc_wait, &wait,
1778                                 TASK_INTERRUPTIBLE);
1779
1780                         i = drbd_find_overlap(&mdev->write_requests, sector, size);
1781                         if (i) {
1782                                 struct drbd_request *req2 =
1783                                         container_of(i, struct drbd_request, i);
1784
1785                                 /* only ALERT on first iteration,
1786                                  * we may be woken up early... */
1787                                 if (first)
1788                                         dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1789                                               " new: %llus +%u; pending: %llus +%u\n",
1790                                               current->comm, current->pid,
1791                                               (unsigned long long)sector, size,
1792                                               (unsigned long long)req2->i.sector, req2->i.size);
1793                                 if (req2->rq_state & RQ_NET_PENDING)
1794                                         ++have_unacked;
1795                                 ++have_conflict;
1796                         }
1797                         if (!have_conflict)
1798                                 break;
1799
1800                         /* Discard Ack only for the _first_ iteration */
1801                         if (first && discard && have_unacked) {
1802                                 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1803                                      (unsigned long long)sector);
1804                                 inc_unacked(mdev);
1805                                 e->w.cb = e_send_discard_ack;
1806                                 list_add_tail(&e->w.list, &mdev->done_ee);
1807
1808                                 spin_unlock_irq(&mdev->req_lock);
1809
1810                                 /* we could probably send that P_DISCARD_ACK ourselves,
1811                                  * but I don't like the receiver using the msock */
1812
1813                                 put_ldev(mdev);
1814                                 wake_asender(mdev);
1815                                 finish_wait(&mdev->misc_wait, &wait);
1816                                 return true;
1817                         }
1818
1819                         if (signal_pending(current)) {
1820                                 drbd_remove_interval(&mdev->epoch_entries, &e->i);
1821                                 drbd_clear_interval(&e->i);
1822
1823                                 spin_unlock_irq(&mdev->req_lock);
1824
1825                                 finish_wait(&mdev->misc_wait, &wait);
1826                                 goto out_interrupted;
1827                         }
1828
1829                         spin_unlock_irq(&mdev->req_lock);
1830                         if (first) {
1831                                 first = 0;
1832                                 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1833                                      "sec=%llus\n", (unsigned long long)sector);
1834                         } else if (discard) {
1835                                 /* we had none on the first iteration.
1836                                  * there must be none now. */
1837                                 D_ASSERT(have_unacked == 0);
1838                         }
1839                         schedule();
1840                         spin_lock_irq(&mdev->req_lock);
1841                 }
1842                 finish_wait(&mdev->misc_wait, &wait);
1843         }
1844
1845         list_add(&e->w.list, &mdev->active_ee);
1846         spin_unlock_irq(&mdev->req_lock);
1847
1848         switch (mdev->tconn->net_conf->wire_protocol) {
1849         case DRBD_PROT_C:
1850                 inc_unacked(mdev);
1851                 /* corresponding dec_unacked() in e_end_block()
1852                  * respective _drbd_clear_done_ee */
1853                 break;
1854         case DRBD_PROT_B:
1855                 /* I really don't like it that the receiver thread
1856                  * sends on the msock, but anyways */
1857                 drbd_send_ack(mdev, P_RECV_ACK, e);
1858                 break;
1859         case DRBD_PROT_A:
1860                 /* nothing to do */
1861                 break;
1862         }
1863
1864         if (mdev->state.pdsk < D_INCONSISTENT) {
1865                 /* In case we have the only disk of the cluster, */
1866                 drbd_set_out_of_sync(mdev, e->i.sector, e->i.size);
1867                 e->flags |= EE_CALL_AL_COMPLETE_IO;
1868                 e->flags &= ~EE_MAY_SET_IN_SYNC;
1869                 drbd_al_begin_io(mdev, e->i.sector);
1870         }
1871
1872         if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1873                 return true;
1874
1875         /* don't care for the reason here */
1876         dev_err(DEV, "submit failed, triggering re-connect\n");
1877         spin_lock_irq(&mdev->req_lock);
1878         list_del(&e->w.list);
1879         drbd_remove_interval(&mdev->epoch_entries, &e->i);
1880         drbd_clear_interval(&e->i);
1881         spin_unlock_irq(&mdev->req_lock);
1882         if (e->flags & EE_CALL_AL_COMPLETE_IO)
1883                 drbd_al_complete_io(mdev, e->i.sector);
1884
1885 out_interrupted:
1886         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
1887         put_ldev(mdev);
1888         drbd_free_ee(mdev, e);
1889         return false;
1890 }
1891
1892 /* We may throttle resync, if the lower device seems to be busy,
1893  * and current sync rate is above c_min_rate.
1894  *
1895  * To decide whether or not the lower device is busy, we use a scheme similar
1896  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1897  * (more than 64 sectors) of activity we cannot account for with our own resync
1898  * activity, it obviously is "busy".
1899  *
1900  * The current sync rate used here uses only the most recent two step marks,
1901  * to have a short time average so we can react faster.
1902  */
1903 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1904 {
1905         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1906         unsigned long db, dt, dbdt;
1907         struct lc_element *tmp;
1908         int curr_events;
1909         int throttle = 0;
1910
1911         /* feature disabled? */
1912         if (mdev->sync_conf.c_min_rate == 0)
1913                 return 0;
1914
1915         spin_lock_irq(&mdev->al_lock);
1916         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1917         if (tmp) {
1918                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1919                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1920                         spin_unlock_irq(&mdev->al_lock);
1921                         return 0;
1922                 }
1923                 /* Do not slow down if app IO is already waiting for this extent */
1924         }
1925         spin_unlock_irq(&mdev->al_lock);
1926
1927         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1928                       (int)part_stat_read(&disk->part0, sectors[1]) -
1929                         atomic_read(&mdev->rs_sect_ev);
1930
1931         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1932                 unsigned long rs_left;
1933                 int i;
1934
1935                 mdev->rs_last_events = curr_events;
1936
1937                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1938                  * approx. */
1939                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1940
1941                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1942                         rs_left = mdev->ov_left;
1943                 else
1944                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1945
1946                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1947                 if (!dt)
1948                         dt++;
1949                 db = mdev->rs_mark_left[i] - rs_left;
1950                 dbdt = Bit2KB(db/dt);
1951
1952                 if (dbdt > mdev->sync_conf.c_min_rate)
1953                         throttle = 1;
1954         }
1955         return throttle;
1956 }
1957
1958
1959 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1960 {
1961         sector_t sector;
1962         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1963         struct drbd_epoch_entry *e;
1964         struct digest_info *di = NULL;
1965         int size, verb;
1966         unsigned int fault_type;
1967         struct p_block_req *p = &mdev->data.rbuf.block_req;
1968
1969         sector = be64_to_cpu(p->sector);
1970         size   = be32_to_cpu(p->blksize);
1971
1972         if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
1973                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1974                                 (unsigned long long)sector, size);
1975                 return false;
1976         }
1977         if (sector + (size>>9) > capacity) {
1978                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1979                                 (unsigned long long)sector, size);
1980                 return false;
1981         }
1982
1983         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1984                 verb = 1;
1985                 switch (cmd) {
1986                 case P_DATA_REQUEST:
1987                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
1988                         break;
1989                 case P_RS_DATA_REQUEST:
1990                 case P_CSUM_RS_REQUEST:
1991                 case P_OV_REQUEST:
1992                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
1993                         break;
1994                 case P_OV_REPLY:
1995                         verb = 0;
1996                         dec_rs_pending(mdev);
1997                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
1998                         break;
1999                 default:
2000                         dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2001                                 cmdname(cmd));
2002                 }
2003                 if (verb && __ratelimit(&drbd_ratelimit_state))
2004                         dev_err(DEV, "Can not satisfy peer's read request, "
2005                             "no local data.\n");
2006
2007                 /* drain possibly payload */
2008                 return drbd_drain_block(mdev, digest_size);
2009         }
2010
2011         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2012          * "criss-cross" setup, that might cause write-out on some other DRBD,
2013          * which in turn might block on the other node at this very place.  */
2014         e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2015         if (!e) {
2016                 put_ldev(mdev);
2017                 return false;
2018         }
2019
2020         switch (cmd) {
2021         case P_DATA_REQUEST:
2022                 e->w.cb = w_e_end_data_req;
2023                 fault_type = DRBD_FAULT_DT_RD;
2024                 /* application IO, don't drbd_rs_begin_io */
2025                 goto submit;
2026
2027         case P_RS_DATA_REQUEST:
2028                 e->w.cb = w_e_end_rsdata_req;
2029                 fault_type = DRBD_FAULT_RS_RD;
2030                 /* used in the sector offset progress display */
2031                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2032                 break;
2033
2034         case P_OV_REPLY:
2035         case P_CSUM_RS_REQUEST:
2036                 fault_type = DRBD_FAULT_RS_RD;
2037                 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2038                 if (!di)
2039                         goto out_free_e;
2040
2041                 di->digest_size = digest_size;
2042                 di->digest = (((char *)di)+sizeof(struct digest_info));
2043
2044                 e->digest = di;
2045                 e->flags |= EE_HAS_DIGEST;
2046
2047                 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2048                         goto out_free_e;
2049
2050                 if (cmd == P_CSUM_RS_REQUEST) {
2051                         D_ASSERT(mdev->agreed_pro_version >= 89);
2052                         e->w.cb = w_e_end_csum_rs_req;
2053                         /* used in the sector offset progress display */
2054                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2055                 } else if (cmd == P_OV_REPLY) {
2056                         /* track progress, we may need to throttle */
2057                         atomic_add(size >> 9, &mdev->rs_sect_in);
2058                         e->w.cb = w_e_end_ov_reply;
2059                         dec_rs_pending(mdev);
2060                         /* drbd_rs_begin_io done when we sent this request,
2061                          * but accounting still needs to be done. */
2062                         goto submit_for_resync;
2063                 }
2064                 break;
2065
2066         case P_OV_REQUEST:
2067                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2068                     mdev->agreed_pro_version >= 90) {
2069                         unsigned long now = jiffies;
2070                         int i;
2071                         mdev->ov_start_sector = sector;
2072                         mdev->ov_position = sector;
2073                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2074                         mdev->rs_total = mdev->ov_left;
2075                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2076                                 mdev->rs_mark_left[i] = mdev->ov_left;
2077                                 mdev->rs_mark_time[i] = now;
2078                         }
2079                         dev_info(DEV, "Online Verify start sector: %llu\n",
2080                                         (unsigned long long)sector);
2081                 }
2082                 e->w.cb = w_e_end_ov_req;
2083                 fault_type = DRBD_FAULT_RS_RD;
2084                 break;
2085
2086         default:
2087                 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2088                     cmdname(cmd));
2089                 fault_type = DRBD_FAULT_MAX;
2090                 goto out_free_e;
2091         }
2092
2093         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2094          * wrt the receiver, but it is not as straightforward as it may seem.
2095          * Various places in the resync start and stop logic assume resync
2096          * requests are processed in order, requeuing this on the worker thread
2097          * introduces a bunch of new code for synchronization between threads.
2098          *
2099          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2100          * "forever", throttling after drbd_rs_begin_io will lock that extent
2101          * for application writes for the same time.  For now, just throttle
2102          * here, where the rest of the code expects the receiver to sleep for
2103          * a while, anyways.
2104          */
2105
2106         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2107          * this defers syncer requests for some time, before letting at least
2108          * on request through.  The resync controller on the receiving side
2109          * will adapt to the incoming rate accordingly.
2110          *
2111          * We cannot throttle here if remote is Primary/SyncTarget:
2112          * we would also throttle its application reads.
2113          * In that case, throttling is done on the SyncTarget only.
2114          */
2115         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2116                 schedule_timeout_uninterruptible(HZ/10);
2117         if (drbd_rs_begin_io(mdev, sector))
2118                 goto out_free_e;
2119
2120 submit_for_resync:
2121         atomic_add(size >> 9, &mdev->rs_sect_ev);
2122
2123 submit:
2124         inc_unacked(mdev);
2125         spin_lock_irq(&mdev->req_lock);
2126         list_add_tail(&e->w.list, &mdev->read_ee);
2127         spin_unlock_irq(&mdev->req_lock);
2128
2129         if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2130                 return true;
2131
2132         /* don't care for the reason here */
2133         dev_err(DEV, "submit failed, triggering re-connect\n");
2134         spin_lock_irq(&mdev->req_lock);
2135         list_del(&e->w.list);
2136         spin_unlock_irq(&mdev->req_lock);
2137         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2138
2139 out_free_e:
2140         put_ldev(mdev);
2141         drbd_free_ee(mdev, e);
2142         return false;
2143 }
2144
2145 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2146 {
2147         int self, peer, rv = -100;
2148         unsigned long ch_self, ch_peer;
2149
2150         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2151         peer = mdev->p_uuid[UI_BITMAP] & 1;
2152
2153         ch_peer = mdev->p_uuid[UI_SIZE];
2154         ch_self = mdev->comm_bm_set;
2155
2156         switch (mdev->tconn->net_conf->after_sb_0p) {
2157         case ASB_CONSENSUS:
2158         case ASB_DISCARD_SECONDARY:
2159         case ASB_CALL_HELPER:
2160                 dev_err(DEV, "Configuration error.\n");
2161                 break;
2162         case ASB_DISCONNECT:
2163                 break;
2164         case ASB_DISCARD_YOUNGER_PRI:
2165                 if (self == 0 && peer == 1) {
2166                         rv = -1;
2167                         break;
2168                 }
2169                 if (self == 1 && peer == 0) {
2170                         rv =  1;
2171                         break;
2172                 }
2173                 /* Else fall through to one of the other strategies... */
2174         case ASB_DISCARD_OLDER_PRI:
2175                 if (self == 0 && peer == 1) {
2176                         rv = 1;
2177                         break;
2178                 }
2179                 if (self == 1 && peer == 0) {
2180                         rv = -1;
2181                         break;
2182                 }
2183                 /* Else fall through to one of the other strategies... */
2184                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2185                      "Using discard-least-changes instead\n");
2186         case ASB_DISCARD_ZERO_CHG:
2187                 if (ch_peer == 0 && ch_self == 0) {
2188                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2189                                 ? -1 : 1;
2190                         break;
2191                 } else {
2192                         if (ch_peer == 0) { rv =  1; break; }
2193                         if (ch_self == 0) { rv = -1; break; }
2194                 }
2195                 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2196                         break;
2197         case ASB_DISCARD_LEAST_CHG:
2198                 if      (ch_self < ch_peer)
2199                         rv = -1;
2200                 else if (ch_self > ch_peer)
2201                         rv =  1;
2202                 else /* ( ch_self == ch_peer ) */
2203                      /* Well, then use something else. */
2204                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2205                                 ? -1 : 1;
2206                 break;
2207         case ASB_DISCARD_LOCAL:
2208                 rv = -1;
2209                 break;
2210         case ASB_DISCARD_REMOTE:
2211                 rv =  1;
2212         }
2213
2214         return rv;
2215 }
2216
2217 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2218 {
2219         int hg, rv = -100;
2220
2221         switch (mdev->tconn->net_conf->after_sb_1p) {
2222         case ASB_DISCARD_YOUNGER_PRI:
2223         case ASB_DISCARD_OLDER_PRI:
2224         case ASB_DISCARD_LEAST_CHG:
2225         case ASB_DISCARD_LOCAL:
2226         case ASB_DISCARD_REMOTE:
2227                 dev_err(DEV, "Configuration error.\n");
2228                 break;
2229         case ASB_DISCONNECT:
2230                 break;
2231         case ASB_CONSENSUS:
2232                 hg = drbd_asb_recover_0p(mdev);
2233                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2234                         rv = hg;
2235                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2236                         rv = hg;
2237                 break;
2238         case ASB_VIOLENTLY:
2239                 rv = drbd_asb_recover_0p(mdev);
2240                 break;
2241         case ASB_DISCARD_SECONDARY:
2242                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2243         case ASB_CALL_HELPER:
2244                 hg = drbd_asb_recover_0p(mdev);
2245                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2246                         enum drbd_state_rv rv2;
2247
2248                         drbd_set_role(mdev, R_SECONDARY, 0);
2249                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2250                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2251                           * we do not need to wait for the after state change work either. */
2252                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2253                         if (rv2 != SS_SUCCESS) {
2254                                 drbd_khelper(mdev, "pri-lost-after-sb");
2255                         } else {
2256                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2257                                 rv = hg;
2258                         }
2259                 } else
2260                         rv = hg;
2261         }
2262
2263         return rv;
2264 }
2265
2266 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2267 {
2268         int hg, rv = -100;
2269
2270         switch (mdev->tconn->net_conf->after_sb_2p) {
2271         case ASB_DISCARD_YOUNGER_PRI:
2272         case ASB_DISCARD_OLDER_PRI:
2273         case ASB_DISCARD_LEAST_CHG:
2274         case ASB_DISCARD_LOCAL:
2275         case ASB_DISCARD_REMOTE:
2276         case ASB_CONSENSUS:
2277         case ASB_DISCARD_SECONDARY:
2278                 dev_err(DEV, "Configuration error.\n");
2279                 break;
2280         case ASB_VIOLENTLY:
2281                 rv = drbd_asb_recover_0p(mdev);
2282                 break;
2283         case ASB_DISCONNECT:
2284                 break;
2285         case ASB_CALL_HELPER:
2286                 hg = drbd_asb_recover_0p(mdev);
2287                 if (hg == -1) {
2288                         enum drbd_state_rv rv2;
2289
2290                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2291                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2292                           * we do not need to wait for the after state change work either. */
2293                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2294                         if (rv2 != SS_SUCCESS) {
2295                                 drbd_khelper(mdev, "pri-lost-after-sb");
2296                         } else {
2297                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2298                                 rv = hg;
2299                         }
2300                 } else
2301                         rv = hg;
2302         }
2303
2304         return rv;
2305 }
2306
2307 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2308                            u64 bits, u64 flags)
2309 {
2310         if (!uuid) {
2311                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2312                 return;
2313         }
2314         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2315              text,
2316              (unsigned long long)uuid[UI_CURRENT],
2317              (unsigned long long)uuid[UI_BITMAP],
2318              (unsigned long long)uuid[UI_HISTORY_START],
2319              (unsigned long long)uuid[UI_HISTORY_END],
2320              (unsigned long long)bits,
2321              (unsigned long long)flags);
2322 }
2323
2324 /*
2325   100   after split brain try auto recover
2326     2   C_SYNC_SOURCE set BitMap
2327     1   C_SYNC_SOURCE use BitMap
2328     0   no Sync
2329    -1   C_SYNC_TARGET use BitMap
2330    -2   C_SYNC_TARGET set BitMap
2331  -100   after split brain, disconnect
2332 -1000   unrelated data
2333 -1091   requires proto 91
2334 -1096   requires proto 96
2335  */
2336 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2337 {
2338         u64 self, peer;
2339         int i, j;
2340
2341         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2342         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2343
2344         *rule_nr = 10;
2345         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2346                 return 0;
2347
2348         *rule_nr = 20;
2349         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2350              peer != UUID_JUST_CREATED)
2351                 return -2;
2352
2353         *rule_nr = 30;
2354         if (self != UUID_JUST_CREATED &&
2355             (peer == UUID_JUST_CREATED || peer == (u64)0))
2356                 return 2;
2357
2358         if (self == peer) {
2359                 int rct, dc; /* roles at crash time */
2360
2361                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2362
2363                         if (mdev->agreed_pro_version < 91)
2364                                 return -1091;
2365
2366                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2367                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2368                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2369                                 drbd_uuid_set_bm(mdev, 0UL);
2370
2371                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2372                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2373                                 *rule_nr = 34;
2374                         } else {
2375                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2376                                 *rule_nr = 36;
2377                         }
2378
2379                         return 1;
2380                 }
2381
2382                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2383
2384                         if (mdev->agreed_pro_version < 91)
2385                                 return -1091;
2386
2387                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2388                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2389                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2390
2391                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2392                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2393                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2394
2395                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2396                                 *rule_nr = 35;
2397                         } else {
2398                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2399                                 *rule_nr = 37;
2400                         }
2401
2402                         return -1;
2403                 }
2404
2405                 /* Common power [off|failure] */
2406                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2407                         (mdev->p_uuid[UI_FLAGS] & 2);
2408                 /* lowest bit is set when we were primary,
2409                  * next bit (weight 2) is set when peer was primary */
2410                 *rule_nr = 40;
2411
2412                 switch (rct) {
2413                 case 0: /* !self_pri && !peer_pri */ return 0;
2414                 case 1: /*  self_pri && !peer_pri */ return 1;
2415                 case 2: /* !self_pri &&  peer_pri */ return -1;
2416                 case 3: /*  self_pri &&  peer_pri */
2417                         dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2418                         return dc ? -1 : 1;
2419                 }
2420         }
2421
2422         *rule_nr = 50;
2423         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2424         if (self == peer)
2425                 return -1;
2426
2427         *rule_nr = 51;
2428         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2429         if (self == peer) {
2430                 if (mdev->agreed_pro_version < 96 ?
2431                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2432                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2433                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2434                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2435                            resync as sync source modifications of the peer's UUIDs. */
2436
2437                         if (mdev->agreed_pro_version < 91)
2438                                 return -1091;
2439
2440                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2441                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2442
2443                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2444                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2445
2446                         return -1;
2447                 }
2448         }
2449
2450         *rule_nr = 60;
2451         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2452         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2453                 peer = mdev->p_uuid[i] & ~((u64)1);
2454                 if (self == peer)
2455                         return -2;
2456         }
2457
2458         *rule_nr = 70;
2459         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2460         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2461         if (self == peer)
2462                 return 1;
2463
2464         *rule_nr = 71;
2465         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2466         if (self == peer) {
2467                 if (mdev->agreed_pro_version < 96 ?
2468                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2469                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2470                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2471                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2472                            resync as sync source modifications of our UUIDs. */
2473
2474                         if (mdev->agreed_pro_version < 91)
2475                                 return -1091;
2476
2477                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2478                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2479
2480                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2481                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2482                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2483
2484                         return 1;
2485                 }
2486         }
2487
2488
2489         *rule_nr = 80;
2490         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2491         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2492                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2493                 if (self == peer)
2494                         return 2;
2495         }
2496
2497         *rule_nr = 90;
2498         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2499         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2500         if (self == peer && self != ((u64)0))
2501                 return 100;
2502
2503         *rule_nr = 100;
2504         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2505                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2506                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2507                         peer = mdev->p_uuid[j] & ~((u64)1);
2508                         if (self == peer)
2509                                 return -100;
2510                 }
2511         }
2512
2513         return -1000;
2514 }
2515
2516 /* drbd_sync_handshake() returns the new conn state on success, or
2517    CONN_MASK (-1) on failure.
2518  */
2519 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2520                                            enum drbd_disk_state peer_disk) __must_hold(local)
2521 {
2522         int hg, rule_nr;
2523         enum drbd_conns rv = C_MASK;
2524         enum drbd_disk_state mydisk;
2525
2526         mydisk = mdev->state.disk;
2527         if (mydisk == D_NEGOTIATING)
2528                 mydisk = mdev->new_state_tmp.disk;
2529
2530         dev_info(DEV, "drbd_sync_handshake:\n");
2531         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2532         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2533                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2534
2535         hg = drbd_uuid_compare(mdev, &rule_nr);
2536
2537         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2538
2539         if (hg == -1000) {
2540                 dev_alert(DEV, "Unrelated data, aborting!\n");
2541                 return C_MASK;
2542         }
2543         if (hg < -1000) {
2544                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2545                 return C_MASK;
2546         }
2547
2548         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2549             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2550                 int f = (hg == -100) || abs(hg) == 2;
2551                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2552                 if (f)
2553                         hg = hg*2;
2554                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2555                      hg > 0 ? "source" : "target");
2556         }
2557
2558         if (abs(hg) == 100)
2559                 drbd_khelper(mdev, "initial-split-brain");
2560
2561         if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
2562                 int pcount = (mdev->state.role == R_PRIMARY)
2563                            + (peer_role == R_PRIMARY);
2564                 int forced = (hg == -100);
2565
2566                 switch (pcount) {
2567                 case 0:
2568                         hg = drbd_asb_recover_0p(mdev);
2569                         break;
2570                 case 1:
2571                         hg = drbd_asb_recover_1p(mdev);
2572                         break;
2573                 case 2:
2574                         hg = drbd_asb_recover_2p(mdev);
2575                         break;
2576                 }
2577                 if (abs(hg) < 100) {
2578                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2579                              "automatically solved. Sync from %s node\n",
2580                              pcount, (hg < 0) ? "peer" : "this");
2581                         if (forced) {
2582                                 dev_warn(DEV, "Doing a full sync, since"
2583                                      " UUIDs where ambiguous.\n");
2584                                 hg = hg*2;
2585                         }
2586                 }
2587         }
2588
2589         if (hg == -100) {
2590                 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2591                         hg = -1;
2592                 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2593                         hg = 1;
2594
2595                 if (abs(hg) < 100)
2596                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2597                              "Sync from %s node\n",
2598                              (hg < 0) ? "peer" : "this");
2599         }
2600
2601         if (hg == -100) {
2602                 /* FIXME this log message is not correct if we end up here
2603                  * after an attempted attach on a diskless node.
2604                  * We just refuse to attach -- well, we drop the "connection"
2605                  * to that disk, in a way... */
2606                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2607                 drbd_khelper(mdev, "split-brain");
2608                 return C_MASK;
2609         }
2610
2611         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2612                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2613                 return C_MASK;
2614         }
2615
2616         if (hg < 0 && /* by intention we do not use mydisk here. */
2617             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2618                 switch (mdev->tconn->net_conf->rr_conflict) {
2619                 case ASB_CALL_HELPER:
2620                         drbd_khelper(mdev, "pri-lost");
2621                         /* fall through */
2622                 case ASB_DISCONNECT:
2623                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2624                         return C_MASK;
2625                 case ASB_VIOLENTLY:
2626                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2627                              "assumption\n");
2628                 }
2629         }
2630
2631         if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2632                 if (hg == 0)
2633                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2634                 else
2635                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2636                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2637                                  abs(hg) >= 2 ? "full" : "bit-map based");
2638                 return C_MASK;
2639         }
2640
2641         if (abs(hg) >= 2) {
2642                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2643                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2644                                         BM_LOCKED_SET_ALLOWED))
2645                         return C_MASK;
2646         }
2647
2648         if (hg > 0) { /* become sync source. */
2649                 rv = C_WF_BITMAP_S;
2650         } else if (hg < 0) { /* become sync target */
2651                 rv = C_WF_BITMAP_T;
2652         } else {
2653                 rv = C_CONNECTED;
2654                 if (drbd_bm_total_weight(mdev)) {
2655                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2656                              drbd_bm_total_weight(mdev));
2657                 }
2658         }
2659
2660         return rv;
2661 }
2662
2663 /* returns 1 if invalid */
2664 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2665 {
2666         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2667         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2668             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2669                 return 0;
2670
2671         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2672         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2673             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2674                 return 1;
2675
2676         /* everything else is valid if they are equal on both sides. */
2677         if (peer == self)
2678                 return 0;
2679
2680         /* everything es is invalid. */
2681         return 1;
2682 }
2683
2684 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2685 {
2686         struct p_protocol *p = &mdev->data.rbuf.protocol;
2687         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2688         int p_want_lose, p_two_primaries, cf;
2689         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2690
2691         p_proto         = be32_to_cpu(p->protocol);
2692         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2693         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2694         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2695         p_two_primaries = be32_to_cpu(p->two_primaries);
2696         cf              = be32_to_cpu(p->conn_flags);
2697         p_want_lose = cf & CF_WANT_LOSE;
2698
2699         clear_bit(CONN_DRY_RUN, &mdev->flags);
2700
2701         if (cf & CF_DRY_RUN)
2702                 set_bit(CONN_DRY_RUN, &mdev->flags);
2703
2704         if (p_proto != mdev->tconn->net_conf->wire_protocol) {
2705                 dev_err(DEV, "incompatible communication protocols\n");
2706                 goto disconnect;
2707         }
2708
2709         if (cmp_after_sb(p_after_sb_0p, mdev->tconn->net_conf->after_sb_0p)) {
2710                 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2711                 goto disconnect;
2712         }
2713
2714         if (cmp_after_sb(p_after_sb_1p, mdev->tconn->net_conf->after_sb_1p)) {
2715                 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2716                 goto disconnect;
2717         }
2718
2719         if (cmp_after_sb(p_after_sb_2p, mdev->tconn->net_conf->after_sb_2p)) {
2720                 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2721                 goto disconnect;
2722         }
2723
2724         if (p_want_lose && mdev->tconn->net_conf->want_lose) {
2725                 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2726                 goto disconnect;
2727         }
2728
2729         if (p_two_primaries != mdev->tconn->net_conf->two_primaries) {
2730                 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2731                 goto disconnect;
2732         }
2733
2734         if (mdev->agreed_pro_version >= 87) {
2735                 unsigned char *my_alg = mdev->tconn->net_conf->integrity_alg;
2736
2737                 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2738                         return false;
2739
2740                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2741                 if (strcmp(p_integrity_alg, my_alg)) {
2742                         dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2743                         goto disconnect;
2744                 }
2745                 dev_info(DEV, "data-integrity-alg: %s\n",
2746                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2747         }
2748
2749         return true;
2750
2751 disconnect:
2752         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2753         return false;
2754 }
2755
2756 /* helper function
2757  * input: alg name, feature name
2758  * return: NULL (alg name was "")
2759  *         ERR_PTR(error) if something goes wrong
2760  *         or the crypto hash ptr, if it worked out ok. */
2761 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2762                 const char *alg, const char *name)
2763 {
2764         struct crypto_hash *tfm;
2765
2766         if (!alg[0])
2767                 return NULL;
2768
2769         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2770         if (IS_ERR(tfm)) {
2771                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2772                         alg, name, PTR_ERR(tfm));
2773                 return tfm;
2774         }
2775         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2776                 crypto_free_hash(tfm);
2777                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2778                 return ERR_PTR(-EINVAL);
2779         }
2780         return tfm;
2781 }
2782
2783 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2784 {
2785         int ok = true;
2786         struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2787         unsigned int header_size, data_size, exp_max_sz;
2788         struct crypto_hash *verify_tfm = NULL;
2789         struct crypto_hash *csums_tfm = NULL;
2790         const int apv = mdev->agreed_pro_version;
2791         int *rs_plan_s = NULL;
2792         int fifo_size = 0;
2793
2794         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2795                     : apv == 88 ? sizeof(struct p_rs_param)
2796                                         + SHARED_SECRET_MAX
2797                     : apv <= 94 ? sizeof(struct p_rs_param_89)
2798                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2799
2800         if (packet_size > exp_max_sz) {
2801                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2802                     packet_size, exp_max_sz);
2803                 return false;
2804         }
2805
2806         if (apv <= 88) {
2807                 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2808                 data_size   = packet_size  - header_size;
2809         } else if (apv <= 94) {
2810                 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2811                 data_size   = packet_size  - header_size;
2812                 D_ASSERT(data_size == 0);
2813         } else {
2814                 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2815                 data_size   = packet_size  - header_size;
2816                 D_ASSERT(data_size == 0);
2817         }
2818
2819         /* initialize verify_alg and csums_alg */
2820         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2821
2822         if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2823                 return false;
2824
2825         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2826
2827         if (apv >= 88) {
2828                 if (apv == 88) {
2829                         if (data_size > SHARED_SECRET_MAX) {
2830                                 dev_err(DEV, "verify-alg too long, "
2831                                     "peer wants %u, accepting only %u byte\n",
2832                                                 data_size, SHARED_SECRET_MAX);
2833                                 return false;
2834                         }
2835
2836                         if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2837                                 return false;
2838
2839                         /* we expect NUL terminated string */
2840                         /* but just in case someone tries to be evil */
2841                         D_ASSERT(p->verify_alg[data_size-1] == 0);
2842                         p->verify_alg[data_size-1] = 0;
2843
2844                 } else /* apv >= 89 */ {
2845                         /* we still expect NUL terminated strings */
2846                         /* but just in case someone tries to be evil */
2847                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2848                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2849                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2850                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2851                 }
2852
2853                 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2854                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2855                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2856                                     mdev->sync_conf.verify_alg, p->verify_alg);
2857                                 goto disconnect;
2858                         }
2859                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2860                                         p->verify_alg, "verify-alg");
2861                         if (IS_ERR(verify_tfm)) {
2862                                 verify_tfm = NULL;
2863                                 goto disconnect;
2864                         }
2865                 }
2866
2867                 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2868                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2869                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2870                                     mdev->sync_conf.csums_alg, p->csums_alg);
2871                                 goto disconnect;
2872                         }
2873                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2874                                         p->csums_alg, "csums-alg");
2875                         if (IS_ERR(csums_tfm)) {
2876                                 csums_tfm = NULL;
2877                                 goto disconnect;
2878                         }
2879                 }
2880
2881                 if (apv > 94) {
2882                         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2883                         mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2884                         mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2885                         mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2886                         mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2887
2888                         fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2889                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2890                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2891                                 if (!rs_plan_s) {
2892                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
2893                                         goto disconnect;
2894                                 }
2895                         }
2896                 }
2897
2898                 spin_lock(&mdev->peer_seq_lock);
2899                 /* lock against drbd_nl_syncer_conf() */
2900                 if (verify_tfm) {
2901                         strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2902                         mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2903                         crypto_free_hash(mdev->verify_tfm);
2904                         mdev->verify_tfm = verify_tfm;
2905                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2906                 }
2907                 if (csums_tfm) {
2908                         strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2909                         mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2910                         crypto_free_hash(mdev->csums_tfm);
2911                         mdev->csums_tfm = csums_tfm;
2912                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2913                 }
2914                 if (fifo_size != mdev->rs_plan_s.size) {
2915                         kfree(mdev->rs_plan_s.values);
2916                         mdev->rs_plan_s.values = rs_plan_s;
2917                         mdev->rs_plan_s.size   = fifo_size;
2918                         mdev->rs_planed = 0;
2919                 }
2920                 spin_unlock(&mdev->peer_seq_lock);
2921         }
2922
2923         return ok;
2924 disconnect:
2925         /* just for completeness: actually not needed,
2926          * as this is not reached if csums_tfm was ok. */
2927         crypto_free_hash(csums_tfm);
2928         /* but free the verify_tfm again, if csums_tfm did not work out */
2929         crypto_free_hash(verify_tfm);
2930         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2931         return false;
2932 }
2933
2934 /* warn if the arguments differ by more than 12.5% */
2935 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2936         const char *s, sector_t a, sector_t b)
2937 {
2938         sector_t d;
2939         if (a == 0 || b == 0)
2940                 return;
2941         d = (a > b) ? (a - b) : (b - a);
2942         if (d > (a>>3) || d > (b>>3))
2943                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2944                      (unsigned long long)a, (unsigned long long)b);
2945 }
2946
2947 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2948 {
2949         struct p_sizes *p = &mdev->data.rbuf.sizes;
2950         enum determine_dev_size dd = unchanged;
2951         sector_t p_size, p_usize, my_usize;
2952         int ldsc = 0; /* local disk size changed */
2953         enum dds_flags ddsf;
2954
2955         p_size = be64_to_cpu(p->d_size);
2956         p_usize = be64_to_cpu(p->u_size);
2957
2958         if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2959                 dev_err(DEV, "some backing storage is needed\n");
2960                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2961                 return false;
2962         }
2963
2964         /* just store the peer's disk size for now.
2965          * we still need to figure out whether we accept that. */
2966         mdev->p_size = p_size;
2967
2968         if (get_ldev(mdev)) {
2969                 warn_if_differ_considerably(mdev, "lower level device sizes",
2970                            p_size, drbd_get_max_capacity(mdev->ldev));
2971                 warn_if_differ_considerably(mdev, "user requested size",
2972                                             p_usize, mdev->ldev->dc.disk_size);
2973
2974                 /* if this is the first connect, or an otherwise expected
2975                  * param exchange, choose the minimum */
2976                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2977                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2978                                              p_usize);
2979
2980                 my_usize = mdev->ldev->dc.disk_size;
2981
2982                 if (mdev->ldev->dc.disk_size != p_usize) {
2983                         mdev->ldev->dc.disk_size = p_usize;
2984                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2985                              (unsigned long)mdev->ldev->dc.disk_size);
2986                 }
2987
2988                 /* Never shrink a device with usable data during connect.
2989                    But allow online shrinking if we are connected. */
2990                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2991                    drbd_get_capacity(mdev->this_bdev) &&
2992                    mdev->state.disk >= D_OUTDATED &&
2993                    mdev->state.conn < C_CONNECTED) {
2994                         dev_err(DEV, "The peer's disk size is too small!\n");
2995                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2996                         mdev->ldev->dc.disk_size = my_usize;
2997                         put_ldev(mdev);
2998                         return false;
2999                 }
3000                 put_ldev(mdev);
3001         }
3002
3003         ddsf = be16_to_cpu(p->dds_flags);
3004         if (get_ldev(mdev)) {
3005                 dd = drbd_determine_dev_size(mdev, ddsf);
3006                 put_ldev(mdev);
3007                 if (dd == dev_size_error)
3008                         return false;
3009                 drbd_md_sync(mdev);
3010         } else {
3011                 /* I am diskless, need to accept the peer's size. */
3012                 drbd_set_my_capacity(mdev, p_size);
3013         }
3014
3015         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3016         drbd_reconsider_max_bio_size(mdev);
3017
3018         if (get_ldev(mdev)) {
3019                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3020                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3021                         ldsc = 1;
3022                 }
3023
3024                 put_ldev(mdev);
3025         }
3026
3027         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3028                 if (be64_to_cpu(p->c_size) !=
3029                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3030                         /* we have different sizes, probably peer
3031                          * needs to know my new size... */
3032                         drbd_send_sizes(mdev, 0, ddsf);
3033                 }
3034                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3035                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3036                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3037                             mdev->state.disk >= D_INCONSISTENT) {
3038                                 if (ddsf & DDSF_NO_RESYNC)
3039                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3040                                 else
3041                                         resync_after_online_grow(mdev);
3042                         } else
3043                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3044                 }
3045         }
3046
3047         return true;
3048 }
3049
3050 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3051 {
3052         struct p_uuids *p = &mdev->data.rbuf.uuids;
3053         u64 *p_uuid;
3054         int i, updated_uuids = 0;
3055
3056         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3057
3058         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3059                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3060
3061         kfree(mdev->p_uuid);
3062         mdev->p_uuid = p_uuid;
3063
3064         if (mdev->state.conn < C_CONNECTED &&
3065             mdev->state.disk < D_INCONSISTENT &&
3066             mdev->state.role == R_PRIMARY &&
3067             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3068                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3069                     (unsigned long long)mdev->ed_uuid);
3070                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3071                 return false;
3072         }
3073
3074         if (get_ldev(mdev)) {
3075                 int skip_initial_sync =
3076                         mdev->state.conn == C_CONNECTED &&
3077                         mdev->agreed_pro_version >= 90 &&
3078                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3079                         (p_uuid[UI_FLAGS] & 8);
3080                 if (skip_initial_sync) {
3081                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3082                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3083                                         "clear_n_write from receive_uuids",
3084                                         BM_LOCKED_TEST_ALLOWED);
3085                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3086                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3087                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3088                                         CS_VERBOSE, NULL);
3089                         drbd_md_sync(mdev);
3090                         updated_uuids = 1;
3091                 }
3092                 put_ldev(mdev);
3093         } else if (mdev->state.disk < D_INCONSISTENT &&
3094                    mdev->state.role == R_PRIMARY) {
3095                 /* I am a diskless primary, the peer just created a new current UUID
3096                    for me. */
3097                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3098         }
3099
3100         /* Before we test for the disk state, we should wait until an eventually
3101            ongoing cluster wide state change is finished. That is important if
3102            we are primary and are detaching from our disk. We need to see the
3103            new disk state... */
3104         wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3105         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3106                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3107
3108         if (updated_uuids)
3109                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3110
3111         return true;
3112 }
3113
3114 /**
3115  * convert_state() - Converts the peer's view of the cluster state to our point of view
3116  * @ps:         The state as seen by the peer.
3117  */
3118 static union drbd_state convert_state(union drbd_state ps)
3119 {
3120         union drbd_state ms;
3121
3122         static enum drbd_conns c_tab[] = {
3123                 [C_CONNECTED] = C_CONNECTED,
3124
3125                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3126                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3127                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3128                 [C_VERIFY_S]       = C_VERIFY_T,
3129                 [C_MASK]   = C_MASK,
3130         };
3131
3132         ms.i = ps.i;
3133
3134         ms.conn = c_tab[ps.conn];
3135         ms.peer = ps.role;
3136         ms.role = ps.peer;
3137         ms.pdsk = ps.disk;
3138         ms.disk = ps.pdsk;
3139         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3140
3141         return ms;
3142 }
3143
3144 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3145 {
3146         struct p_req_state *p = &mdev->data.rbuf.req_state;
3147         union drbd_state mask, val;
3148         enum drbd_state_rv rv;
3149
3150         mask.i = be32_to_cpu(p->mask);
3151         val.i = be32_to_cpu(p->val);
3152
3153         if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3154             test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3155                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3156                 return true;
3157         }
3158
3159         mask = convert_state(mask);
3160         val = convert_state(val);
3161
3162         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3163
3164         drbd_send_sr_reply(mdev, rv);
3165         drbd_md_sync(mdev);
3166
3167         return true;
3168 }
3169
3170 static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3171 {
3172         struct p_state *p = &mdev->data.rbuf.state;
3173         union drbd_state os, ns, peer_state;
3174         enum drbd_disk_state real_peer_disk;
3175         enum chg_state_flags cs_flags;
3176         int rv;
3177
3178         peer_state.i = be32_to_cpu(p->state);
3179
3180         real_peer_disk = peer_state.disk;
3181         if (peer_state.disk == D_NEGOTIATING) {
3182                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3183                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3184         }
3185
3186         spin_lock_irq(&mdev->req_lock);
3187  retry:
3188         os = ns = mdev->state;
3189         spin_unlock_irq(&mdev->req_lock);
3190
3191         /* peer says his disk is uptodate, while we think it is inconsistent,
3192          * and this happens while we think we have a sync going on. */
3193         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3194             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3195                 /* If we are (becoming) SyncSource, but peer is still in sync
3196                  * preparation, ignore its uptodate-ness to avoid flapping, it
3197                  * will change to inconsistent once the peer reaches active
3198                  * syncing states.
3199                  * It may have changed syncer-paused flags, however, so we
3200                  * cannot ignore this completely. */
3201                 if (peer_state.conn > C_CONNECTED &&
3202                     peer_state.conn < C_SYNC_SOURCE)
3203                         real_peer_disk = D_INCONSISTENT;
3204
3205                 /* if peer_state changes to connected at the same time,
3206                  * it explicitly notifies us that it finished resync.
3207                  * Maybe we should finish it up, too? */
3208                 else if (os.conn >= C_SYNC_SOURCE &&
3209                          peer_state.conn == C_CONNECTED) {
3210                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3211                                 drbd_resync_finished(mdev);
3212                         return true;
3213                 }
3214         }
3215
3216         /* peer says his disk is inconsistent, while we think it is uptodate,
3217          * and this happens while the peer still thinks we have a sync going on,
3218          * but we think we are already done with the sync.
3219          * We ignore this to avoid flapping pdsk.
3220          * This should not happen, if the peer is a recent version of drbd. */
3221         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3222             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3223                 real_peer_disk = D_UP_TO_DATE;
3224
3225         if (ns.conn == C_WF_REPORT_PARAMS)
3226                 ns.conn = C_CONNECTED;
3227
3228         if (peer_state.conn == C_AHEAD)
3229                 ns.conn = C_BEHIND;
3230
3231         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3232             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3233                 int cr; /* consider resync */
3234
3235                 /* if we established a new connection */
3236                 cr  = (os.conn < C_CONNECTED);
3237                 /* if we had an established connection
3238                  * and one of the nodes newly attaches a disk */
3239                 cr |= (os.conn == C_CONNECTED &&
3240                        (peer_state.disk == D_NEGOTIATING ||
3241                         os.disk == D_NEGOTIATING));
3242                 /* if we have both been inconsistent, and the peer has been
3243                  * forced to be UpToDate with --overwrite-data */
3244                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3245                 /* if we had been plain connected, and the admin requested to
3246                  * start a sync by "invalidate" or "invalidate-remote" */
3247                 cr |= (os.conn == C_CONNECTED &&
3248                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3249                                  peer_state.conn <= C_WF_BITMAP_T));
3250
3251                 if (cr)
3252                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3253
3254                 put_ldev(mdev);
3255                 if (ns.conn == C_MASK) {
3256                         ns.conn = C_CONNECTED;
3257                         if (mdev->state.disk == D_NEGOTIATING) {
3258                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3259                         } else if (peer_state.disk == D_NEGOTIATING) {
3260                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3261                                 peer_state.disk = D_DISKLESS;
3262                                 real_peer_disk = D_DISKLESS;
3263                         } else {
3264                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3265                                         return false;
3266                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3267                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3268                                 return false;
3269                         }
3270                 }
3271         }
3272
3273         spin_lock_irq(&mdev->req_lock);
3274         if (mdev->state.i != os.i)
3275                 goto retry;
3276         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3277         ns.peer = peer_state.role;
3278         ns.pdsk = real_peer_disk;
3279         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3280         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3281                 ns.disk = mdev->new_state_tmp.disk;
3282         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3283         if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3284             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3285                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3286                    for temporal network outages! */
3287                 spin_unlock_irq(&mdev->req_lock);
3288                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3289                 tl_clear(mdev);
3290                 drbd_uuid_new_current(mdev);
3291                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3292                 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3293                 return false;
3294         }
3295         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3296         ns = mdev->state;
3297         spin_unlock_irq(&mdev->req_lock);
3298
3299         if (rv < SS_SUCCESS) {
3300                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3301                 return false;
3302         }
3303
3304         if (os.conn > C_WF_REPORT_PARAMS) {
3305                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3306                     peer_state.disk != D_NEGOTIATING ) {
3307                         /* we want resync, peer has not yet decided to sync... */
3308                         /* Nowadays only used when forcing a node into primary role and
3309                            setting its disk to UpToDate with that */
3310                         drbd_send_uuids(mdev);
3311                         drbd_send_state(mdev);
3312                 }
3313         }
3314
3315         mdev->tconn->net_conf->want_lose = 0;
3316
3317         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3318
3319         return true;
3320 }
3321
3322 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3323 {
3324         struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3325
3326         wait_event(mdev->misc_wait,
3327                    mdev->state.conn == C_WF_SYNC_UUID ||
3328                    mdev->state.conn == C_BEHIND ||
3329                    mdev->state.conn < C_CONNECTED ||
3330                    mdev->state.disk < D_NEGOTIATING);
3331
3332         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3333
3334         /* Here the _drbd_uuid_ functions are right, current should
3335            _not_ be rotated into the history */
3336         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3337                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3338                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3339
3340                 drbd_print_uuids(mdev, "updated sync uuid");
3341                 drbd_start_resync(mdev, C_SYNC_TARGET);
3342
3343                 put_ldev(mdev);
3344         } else
3345                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3346
3347         return true;
3348 }
3349
3350 /**
3351  * receive_bitmap_plain
3352  *
3353  * Return 0 when done, 1 when another iteration is needed, and a negative error
3354  * code upon failure.
3355  */
3356 static int
3357 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3358                      unsigned long *buffer, struct bm_xfer_ctx *c)
3359 {
3360         unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3361         unsigned want = num_words * sizeof(long);
3362         int err;
3363
3364         if (want != data_size) {
3365                 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3366                 return -EIO;
3367         }
3368         if (want == 0)
3369                 return 0;
3370         err = drbd_recv(mdev, buffer, want);
3371         if (err != want) {
3372                 if (err >= 0)
3373                         err = -EIO;
3374                 return err;
3375         }
3376
3377         drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3378
3379         c->word_offset += num_words;
3380         c->bit_offset = c->word_offset * BITS_PER_LONG;
3381         if (c->bit_offset > c->bm_bits)
3382                 c->bit_offset = c->bm_bits;
3383
3384         return 1;
3385 }
3386
3387 /**
3388  * recv_bm_rle_bits
3389  *
3390  * Return 0 when done, 1 when another iteration is needed, and a negative error
3391  * code upon failure.
3392  */
3393 static int
3394 recv_bm_rle_bits(struct drbd_conf *mdev,
3395                 struct p_compressed_bm *p,
3396                 struct bm_xfer_ctx *c)
3397 {
3398         struct bitstream bs;
3399         u64 look_ahead;
3400         u64 rl;
3401         u64 tmp;
3402         unsigned long s = c->bit_offset;
3403         unsigned long e;
3404         int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3405         int toggle = DCBP_get_start(p);
3406         int have;
3407         int bits;
3408
3409         bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3410
3411         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3412         if (bits < 0)
3413                 return -EIO;
3414
3415         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3416                 bits = vli_decode_bits(&rl, look_ahead);
3417                 if (bits <= 0)
3418                         return -EIO;
3419
3420                 if (toggle) {
3421                         e = s + rl -1;
3422                         if (e >= c->bm_bits) {
3423                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3424                                 return -EIO;
3425                         }
3426                         _drbd_bm_set_bits(mdev, s, e);
3427                 }
3428
3429                 if (have < bits) {
3430                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3431                                 have, bits, look_ahead,
3432                                 (unsigned int)(bs.cur.b - p->code),
3433                                 (unsigned int)bs.buf_len);
3434                         return -EIO;
3435                 }
3436                 look_ahead >>= bits;
3437                 have -= bits;
3438
3439                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3440                 if (bits < 0)
3441                         return -EIO;
3442                 look_ahead |= tmp << have;
3443                 have += bits;
3444         }
3445
3446         c->bit_offset = s;
3447         bm_xfer_ctx_bit_to_word_offset(c);
3448
3449         return (s != c->bm_bits);
3450 }
3451
3452 /**
3453  * decode_bitmap_c
3454  *
3455  * Return 0 when done, 1 when another iteration is needed, and a negative error
3456  * code upon failure.
3457  */
3458 static int
3459 decode_bitmap_c(struct drbd_conf *mdev,
3460                 struct p_compressed_bm *p,
3461                 struct bm_xfer_ctx *c)
3462 {
3463         if (DCBP_get_code(p) == RLE_VLI_Bits)
3464                 return recv_bm_rle_bits(mdev, p, c);
3465
3466         /* other variants had been implemented for evaluation,
3467          * but have been dropped as this one turned out to be "best"
3468          * during all our tests. */
3469
3470         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3471         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3472         return -EIO;
3473 }
3474
3475 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3476                 const char *direction, struct bm_xfer_ctx *c)
3477 {
3478         /* what would it take to transfer it "plaintext" */
3479         unsigned plain = sizeof(struct p_header80) *
3480                 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3481                 + c->bm_words * sizeof(long);
3482         unsigned total = c->bytes[0] + c->bytes[1];
3483         unsigned r;
3484
3485         /* total can not be zero. but just in case: */
3486         if (total == 0)
3487                 return;
3488
3489         /* don't report if not compressed */
3490         if (total >= plain)
3491                 return;
3492
3493         /* total < plain. check for overflow, still */
3494         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3495                                     : (1000 * total / plain);
3496
3497         if (r > 1000)
3498                 r = 1000;
3499
3500         r = 1000 - r;
3501         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3502              "total %u; compression: %u.%u%%\n",
3503                         direction,
3504                         c->bytes[1], c->packets[1],
3505                         c->bytes[0], c->packets[0],
3506                         total, r/10, r % 10);
3507 }
3508
3509 /* Since we are processing the bitfield from lower addresses to higher,
3510    it does not matter if the process it in 32 bit chunks or 64 bit
3511    chunks as long as it is little endian. (Understand it as byte stream,
3512    beginning with the lowest byte...) If we would use big endian
3513    we would need to process it from the highest address to the lowest,
3514    in order to be agnostic to the 32 vs 64 bits issue.
3515
3516    returns 0 on failure, 1 if we successfully received it. */
3517 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3518 {
3519         struct bm_xfer_ctx c;
3520         void *buffer;
3521         int err;
3522         int ok = false;
3523         struct p_header80 *h = &mdev->data.rbuf.header.h80;
3524
3525         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3526         /* you are supposed to send additional out-of-sync information
3527          * if you actually set bits during this phase */
3528
3529         /* maybe we should use some per thread scratch page,
3530          * and allocate that during initial device creation? */
3531         buffer   = (unsigned long *) __get_free_page(GFP_NOIO);
3532         if (!buffer) {
3533                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3534                 goto out;
3535         }
3536
3537         c = (struct bm_xfer_ctx) {
3538                 .bm_bits = drbd_bm_bits(mdev),
3539                 .bm_words = drbd_bm_words(mdev),
3540         };
3541
3542         for(;;) {
3543                 if (cmd == P_BITMAP) {
3544                         err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3545                 } else if (cmd == P_COMPRESSED_BITMAP) {
3546                         /* MAYBE: sanity check that we speak proto >= 90,
3547                          * and the feature is enabled! */
3548                         struct p_compressed_bm *p;
3549
3550                         if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3551                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3552                                 goto out;
3553                         }
3554                         /* use the page buff */
3555                         p = buffer;
3556                         memcpy(p, h, sizeof(*h));
3557                         if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3558                                 goto out;
3559                         if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3560                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3561                                 goto out;
3562                         }
3563                         err = decode_bitmap_c(mdev, p, &c);
3564                 } else {
3565                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3566                         goto out;
3567                 }
3568
3569                 c.packets[cmd == P_BITMAP]++;
3570                 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3571
3572                 if (err <= 0) {
3573                         if (err < 0)
3574                                 goto out;
3575                         break;
3576                 }
3577                 if (!drbd_recv_header(mdev, &cmd, &data_size))
3578                         goto out;
3579         }
3580
3581         INFO_bm_xfer_stats(mdev, "receive", &c);
3582
3583         if (mdev->state.conn == C_WF_BITMAP_T) {
3584                 enum drbd_state_rv rv;
3585
3586                 ok = !drbd_send_bitmap(mdev);
3587                 if (!ok)
3588                         goto out;
3589                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3590                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3591                 D_ASSERT(rv == SS_SUCCESS);
3592         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3593                 /* admin may have requested C_DISCONNECTING,
3594                  * other threads may have noticed network errors */
3595                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3596                     drbd_conn_str(mdev->state.conn));
3597         }
3598
3599         ok = true;
3600  out:
3601         drbd_bm_unlock(mdev);
3602         if (ok && mdev->state.conn == C_WF_BITMAP_S)
3603                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3604         free_page((unsigned long) buffer);
3605         return ok;
3606 }
3607
3608 static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3609 {
3610         /* TODO zero copy sink :) */
3611         static char sink[128];
3612         int size, want, r;
3613
3614         dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3615                  cmd, data_size);
3616
3617         size = data_size;
3618         while (size > 0) {
3619                 want = min_t(int, size, sizeof(sink));
3620                 r = drbd_recv(mdev, sink, want);
3621                 if (!expect(r > 0))
3622                         break;
3623                 size -= r;
3624         }
3625         return size == 0;
3626 }
3627
3628 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3629 {
3630         /* Make sure we've acked all the TCP data associated
3631          * with the data requests being unplugged */
3632         drbd_tcp_quickack(mdev->data.socket);
3633
3634         return true;
3635 }
3636
3637 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3638 {
3639         struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3640
3641         switch (mdev->state.conn) {
3642         case C_WF_SYNC_UUID:
3643         case C_WF_BITMAP_T:
3644         case C_BEHIND:
3645                         break;
3646         default:
3647                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3648                                 drbd_conn_str(mdev->state.conn));
3649         }
3650
3651         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3652
3653         return true;
3654 }
3655
3656 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3657
3658 struct data_cmd {
3659         int expect_payload;
3660         size_t pkt_size;
3661         drbd_cmd_handler_f function;
3662 };
3663
3664 static struct data_cmd drbd_cmd_handler[] = {
3665         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
3666         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
3667         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3668         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3669         [P_BITMAP]          = { 1, sizeof(struct p_header80), receive_bitmap } ,
3670         [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3671         [P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3672         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3673         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3674         [P_SYNC_PARAM]      = { 1, sizeof(struct p_header80), receive_SyncParam },
3675         [P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
3676         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3677         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
3678         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
3679         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
3680         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3681         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3682         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3683         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3684         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3685         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3686         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3687         /* anything missing from this table is in
3688          * the asender_tbl, see get_asender_cmd */
3689         [P_MAX_CMD]         = { 0, 0, NULL },
3690 };
3691
3692 /* All handler functions that expect a sub-header get that sub-heder in
3693    mdev->data.rbuf.header.head.payload.
3694
3695    Usually in mdev->data.rbuf.header.head the callback can find the usual
3696    p_header, but they may not rely on that. Since there is also p_header95 !
3697  */
3698
3699 static void drbdd(struct drbd_conf *mdev)
3700 {
3701         union p_header *header = &mdev->data.rbuf.header;
3702         unsigned int packet_size;
3703         enum drbd_packets cmd;
3704         size_t shs; /* sub header size */
3705         int rv;
3706
3707         while (get_t_state(&mdev->receiver) == RUNNING) {
3708                 drbd_thread_current_set_cpu(mdev);
3709                 if (!drbd_recv_header(mdev, &cmd, &packet_size))
3710                         goto err_out;
3711
3712                 if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3713                         dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3714                         goto err_out;
3715                 }
3716
3717                 shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3718                 if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3719                         dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3720                         goto err_out;
3721                 }
3722
3723                 if (shs) {
3724                         rv = drbd_recv(mdev, &header->h80.payload, shs);
3725                         if (unlikely(rv != shs)) {
3726                                 if (!signal_pending(current))
3727                                         dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3728                                 goto err_out;
3729                         }
3730                 }
3731
3732                 rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3733
3734                 if (unlikely(!rv)) {
3735                         dev_err(DEV, "error receiving %s, l: %d!\n",
3736                             cmdname(cmd), packet_size);
3737                         goto err_out;
3738                 }
3739         }
3740
3741         if (0) {
3742         err_out:
3743                 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3744         }
3745         /* If we leave here, we probably want to update at least the
3746          * "Connected" indicator on stable storage. Do so explicitly here. */
3747         drbd_md_sync(mdev);
3748 }
3749
3750 void drbd_flush_workqueue(struct drbd_conf *mdev)
3751 {
3752         struct drbd_wq_barrier barr;
3753
3754         barr.w.cb = w_prev_work_done;
3755         init_completion(&barr.done);
3756         drbd_queue_work(&mdev->data.work, &barr.w);
3757         wait_for_completion(&barr.done);
3758 }
3759
3760 static void drbd_disconnect(struct drbd_conf *mdev)
3761 {
3762         enum drbd_fencing_p fp;
3763         union drbd_state os, ns;
3764         int rv = SS_UNKNOWN_ERROR;
3765         unsigned int i;
3766
3767         if (mdev->state.conn == C_STANDALONE)
3768                 return;
3769
3770         /* asender does not clean up anything. it must not interfere, either */
3771         drbd_thread_stop(&mdev->asender);
3772         drbd_free_sock(mdev);
3773
3774         /* wait for current activity to cease. */
3775         spin_lock_irq(&mdev->req_lock);
3776         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3777         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3778         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3779         spin_unlock_irq(&mdev->req_lock);
3780
3781         /* We do not have data structures that would allow us to
3782          * get the rs_pending_cnt down to 0 again.
3783          *  * On C_SYNC_TARGET we do not have any data structures describing
3784          *    the pending RSDataRequest's we have sent.
3785          *  * On C_SYNC_SOURCE there is no data structure that tracks
3786          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3787          *  And no, it is not the sum of the reference counts in the
3788          *  resync_LRU. The resync_LRU tracks the whole operation including
3789          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3790          *  on the fly. */
3791         drbd_rs_cancel_all(mdev);
3792         mdev->rs_total = 0;
3793         mdev->rs_failed = 0;
3794         atomic_set(&mdev->rs_pending_cnt, 0);
3795         wake_up(&mdev->misc_wait);
3796
3797         del_timer(&mdev->request_timer);
3798
3799         /* make sure syncer is stopped and w_resume_next_sg queued */
3800         del_timer_sync(&mdev->resync_timer);
3801         resync_timer_fn((unsigned long)mdev);
3802
3803         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3804          * w_make_resync_request etc. which may still be on the worker queue
3805          * to be "canceled" */
3806         drbd_flush_workqueue(mdev);
3807
3808         /* This also does reclaim_net_ee().  If we do this too early, we might
3809          * miss some resync ee and pages.*/
3810         drbd_process_done_ee(mdev);
3811
3812         kfree(mdev->p_uuid);
3813         mdev->p_uuid = NULL;
3814
3815         if (!is_susp(mdev->state))
3816                 tl_clear(mdev);
3817
3818         dev_info(DEV, "Connection closed\n");
3819
3820         drbd_md_sync(mdev);
3821
3822         fp = FP_DONT_CARE;
3823         if (get_ldev(mdev)) {
3824                 fp = mdev->ldev->dc.fencing;
3825                 put_ldev(mdev);
3826         }
3827
3828         if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3829                 drbd_try_outdate_peer_async(mdev);
3830
3831         spin_lock_irq(&mdev->req_lock);
3832         os = mdev->state;
3833         if (os.conn >= C_UNCONNECTED) {
3834                 /* Do not restart in case we are C_DISCONNECTING */
3835                 ns = os;
3836                 ns.conn = C_UNCONNECTED;
3837                 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3838         }
3839         spin_unlock_irq(&mdev->req_lock);
3840
3841         if (os.conn == C_DISCONNECTING) {
3842                 wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3843
3844                 crypto_free_hash(mdev->cram_hmac_tfm);
3845                 mdev->cram_hmac_tfm = NULL;
3846
3847                 kfree(mdev->tconn->net_conf);
3848                 mdev->tconn->net_conf = NULL;
3849                 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3850         }
3851
3852         /* serialize with bitmap writeout triggered by the state change,
3853          * if any. */
3854         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3855
3856         /* tcp_close and release of sendpage pages can be deferred.  I don't
3857          * want to use SO_LINGER, because apparently it can be deferred for
3858          * more than 20 seconds (longest time I checked).
3859          *
3860          * Actually we don't care for exactly when the network stack does its
3861          * put_page(), but release our reference on these pages right here.
3862          */
3863         i = drbd_release_ee(mdev, &mdev->net_ee);
3864         if (i)
3865                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3866         i = atomic_read(&mdev->pp_in_use_by_net);
3867         if (i)
3868                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3869         i = atomic_read(&mdev->pp_in_use);
3870         if (i)
3871                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3872
3873         D_ASSERT(list_empty(&mdev->read_ee));
3874         D_ASSERT(list_empty(&mdev->active_ee));
3875         D_ASSERT(list_empty(&mdev->sync_ee));
3876         D_ASSERT(list_empty(&mdev->done_ee));
3877
3878         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3879         atomic_set(&mdev->current_epoch->epoch_size, 0);
3880         D_ASSERT(list_empty(&mdev->current_epoch->list));
3881 }
3882
3883 /*
3884  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3885  * we can agree on is stored in agreed_pro_version.
3886  *
3887  * feature flags and the reserved array should be enough room for future
3888  * enhancements of the handshake protocol, and possible plugins...
3889  *
3890  * for now, they are expected to be zero, but ignored.
3891  */
3892 static int drbd_send_handshake(struct drbd_conf *mdev)
3893 {
3894         /* ASSERT current == mdev->receiver ... */
3895         struct p_handshake *p = &mdev->data.sbuf.handshake;
3896         int ok;
3897
3898         if (mutex_lock_interruptible(&mdev->data.mutex)) {
3899                 dev_err(DEV, "interrupted during initial handshake\n");
3900                 return 0; /* interrupted. not ok. */
3901         }
3902
3903         if (mdev->data.socket == NULL) {
3904                 mutex_unlock(&mdev->data.mutex);
3905                 return 0;
3906         }
3907
3908         memset(p, 0, sizeof(*p));
3909         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3910         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3911         ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3912                              (struct p_header80 *)p, sizeof(*p), 0 );
3913         mutex_unlock(&mdev->data.mutex);
3914         return ok;
3915 }
3916
3917 /*
3918  * return values:
3919  *   1 yes, we have a valid connection
3920  *   0 oops, did not work out, please try again
3921  *  -1 peer talks different language,
3922  *     no point in trying again, please go standalone.
3923  */
3924 static int drbd_do_handshake(struct drbd_conf *mdev)
3925 {
3926         /* ASSERT current == mdev->receiver ... */
3927         struct p_handshake *p = &mdev->data.rbuf.handshake;
3928         const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
3929         unsigned int length;
3930         enum drbd_packets cmd;
3931         int rv;
3932
3933         rv = drbd_send_handshake(mdev);
3934         if (!rv)
3935                 return 0;
3936
3937         rv = drbd_recv_header(mdev, &cmd, &length);
3938         if (!rv)
3939                 return 0;
3940
3941         if (cmd != P_HAND_SHAKE) {
3942                 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3943                      cmdname(cmd), cmd);
3944                 return -1;
3945         }
3946
3947         if (length != expect) {
3948                 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3949                      expect, length);
3950                 return -1;
3951         }
3952
3953         rv = drbd_recv(mdev, &p->head.payload, expect);
3954
3955         if (rv != expect) {
3956                 if (!signal_pending(current))
3957                         dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
3958                 return 0;
3959         }
3960
3961         p->protocol_min = be32_to_cpu(p->protocol_min);
3962         p->protocol_max = be32_to_cpu(p->protocol_max);
3963         if (p->protocol_max == 0)
3964                 p->protocol_max = p->protocol_min;
3965
3966         if (PRO_VERSION_MAX < p->protocol_min ||
3967             PRO_VERSION_MIN > p->protocol_max)
3968                 goto incompat;
3969
3970         mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3971
3972         dev_info(DEV, "Handshake successful: "
3973              "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3974
3975         return 1;
3976
3977  incompat:
3978         dev_err(DEV, "incompatible DRBD dialects: "
3979             "I support %d-%d, peer supports %d-%d\n",
3980             PRO_VERSION_MIN, PRO_VERSION_MAX,
3981             p->protocol_min, p->protocol_max);
3982         return -1;
3983 }
3984
3985 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3986 static int drbd_do_auth(struct drbd_conf *mdev)
3987 {
3988         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3989         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3990         return -1;
3991 }
3992 #else
3993 #define CHALLENGE_LEN 64
3994
3995 /* Return value:
3996         1 - auth succeeded,
3997         0 - failed, try again (network error),
3998         -1 - auth failed, don't try again.
3999 */
4000
4001 static int drbd_do_auth(struct drbd_conf *mdev)
4002 {
4003         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4004         struct scatterlist sg;
4005         char *response = NULL;
4006         char *right_response = NULL;
4007         char *peers_ch = NULL;
4008         unsigned int key_len = strlen(mdev->tconn->net_conf->shared_secret);
4009         unsigned int resp_size;
4010         struct hash_desc desc;
4011         enum drbd_packets cmd;
4012         unsigned int length;
4013         int rv;
4014
4015         desc.tfm = mdev->cram_hmac_tfm;
4016         desc.flags = 0;
4017
4018         rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4019                                 (u8 *)mdev->tconn->net_conf->shared_secret, key_len);
4020         if (rv) {
4021                 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4022                 rv = -1;
4023                 goto fail;
4024         }
4025
4026         get_random_bytes(my_challenge, CHALLENGE_LEN);
4027
4028         rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4029         if (!rv)
4030                 goto fail;
4031
4032         rv = drbd_recv_header(mdev, &cmd, &length);
4033         if (!rv)
4034                 goto fail;
4035
4036         if (cmd != P_AUTH_CHALLENGE) {
4037                 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4038                     cmdname(cmd), cmd);
4039                 rv = 0;
4040                 goto fail;
4041         }
4042
4043         if (length > CHALLENGE_LEN * 2) {
4044                 dev_err(DEV, "expected AuthChallenge payload too big.\n");
4045                 rv = -1;
4046                 goto fail;
4047         }
4048
4049         peers_ch = kmalloc(length, GFP_NOIO);
4050         if (peers_ch == NULL) {
4051                 dev_err(DEV, "kmalloc of peers_ch failed\n");
4052                 rv = -1;
4053                 goto fail;
4054         }
4055
4056         rv = drbd_recv(mdev, peers_ch, length);
4057
4058         if (rv != length) {
4059                 if (!signal_pending(current))
4060                         dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4061                 rv = 0;
4062                 goto fail;
4063         }
4064
4065         resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4066         response = kmalloc(resp_size, GFP_NOIO);
4067         if (response == NULL) {
4068                 dev_err(DEV, "kmalloc of response failed\n");
4069                 rv = -1;
4070                 goto fail;
4071         }
4072
4073         sg_init_table(&sg, 1);
4074         sg_set_buf(&sg, peers_ch, length);
4075
4076         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4077         if (rv) {
4078                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4079                 rv = -1;
4080                 goto fail;
4081         }
4082
4083         rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4084         if (!rv)
4085                 goto fail;
4086
4087         rv = drbd_recv_header(mdev, &cmd, &length);
4088         if (!rv)
4089                 goto fail;
4090
4091         if (cmd != P_AUTH_RESPONSE) {
4092                 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4093                         cmdname(cmd), cmd);
4094                 rv = 0;
4095                 goto fail;
4096         }
4097
4098         if (length != resp_size) {
4099                 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4100                 rv = 0;
4101                 goto fail;
4102         }
4103
4104         rv = drbd_recv(mdev, response , resp_size);
4105
4106         if (rv != resp_size) {
4107                 if (!signal_pending(current))
4108                         dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4109                 rv = 0;
4110                 goto fail;
4111         }
4112
4113         right_response = kmalloc(resp_size, GFP_NOIO);
4114         if (right_response == NULL) {
4115                 dev_err(DEV, "kmalloc of right_response failed\n");
4116                 rv = -1;
4117                 goto fail;
4118         }
4119
4120         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4121
4122         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4123         if (rv) {
4124                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4125                 rv = -1;
4126                 goto fail;
4127         }
4128
4129         rv = !memcmp(response, right_response, resp_size);
4130
4131         if (rv)
4132                 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4133                      resp_size, mdev->tconn->net_conf->cram_hmac_alg);
4134         else
4135                 rv = -1;
4136
4137  fail:
4138         kfree(peers_ch);
4139         kfree(response);
4140         kfree(right_response);
4141
4142         return rv;
4143 }
4144 #endif
4145
4146 int drbdd_init(struct drbd_thread *thi)
4147 {
4148         struct drbd_conf *mdev = thi->mdev;
4149         unsigned int minor = mdev_to_minor(mdev);
4150         int h;
4151
4152         sprintf(current->comm, "drbd%d_receiver", minor);
4153
4154         dev_info(DEV, "receiver (re)started\n");
4155
4156         do {
4157                 h = drbd_connect(mdev);
4158                 if (h == 0) {
4159                         drbd_disconnect(mdev);
4160                         schedule_timeout_interruptible(HZ);
4161                 }
4162                 if (h == -1) {
4163                         dev_warn(DEV, "Discarding network configuration.\n");
4164                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4165                 }
4166         } while (h == 0);
4167
4168         if (h > 0) {
4169                 if (get_net_conf(mdev)) {
4170                         drbdd(mdev);
4171                         put_net_conf(mdev);
4172                 }
4173         }
4174
4175         drbd_disconnect(mdev);
4176
4177         dev_info(DEV, "receiver terminated\n");
4178         return 0;
4179 }
4180
4181 /* ********* acknowledge sender ******** */
4182
4183 static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4184 {
4185         struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4186
4187         int retcode = be32_to_cpu(p->retcode);
4188
4189         if (retcode >= SS_SUCCESS) {
4190                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4191         } else {
4192                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4193                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4194                     drbd_set_st_err_str(retcode), retcode);
4195         }
4196         wake_up(&mdev->state_wait);
4197
4198         return true;
4199 }
4200
4201 static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4202 {
4203         return drbd_send_ping_ack(mdev);
4204
4205 }
4206
4207 static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4208 {
4209         /* restore idle timeout */
4210         mdev->meta.socket->sk->sk_rcvtimeo = mdev->tconn->net_conf->ping_int*HZ;
4211         if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4212                 wake_up(&mdev->misc_wait);
4213
4214         return true;
4215 }
4216
4217 static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4218 {
4219         struct p_block_ack *p = (struct p_block_ack *)h;
4220         sector_t sector = be64_to_cpu(p->sector);
4221         int blksize = be32_to_cpu(p->blksize);
4222
4223         D_ASSERT(mdev->agreed_pro_version >= 89);
4224
4225         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4226
4227         if (get_ldev(mdev)) {
4228                 drbd_rs_complete_io(mdev, sector);
4229                 drbd_set_in_sync(mdev, sector, blksize);
4230                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4231                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4232                 put_ldev(mdev);
4233         }
4234         dec_rs_pending(mdev);
4235         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4236
4237         return true;
4238 }
4239
4240 static int
4241 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4242                               struct rb_root *root, const char *func,
4243                               enum drbd_req_event what, bool missing_ok)
4244 {
4245         struct drbd_request *req;
4246         struct bio_and_error m;
4247
4248         spin_lock_irq(&mdev->req_lock);
4249         req = find_request(mdev, root, id, sector, missing_ok, func);
4250         if (unlikely(!req)) {
4251                 spin_unlock_irq(&mdev->req_lock);
4252                 return false;
4253         }
4254         __req_mod(req, what, &m);
4255         spin_unlock_irq(&mdev->req_lock);
4256
4257         if (m.bio)
4258                 complete_master_bio(mdev, &m);
4259         return true;
4260 }
4261
4262 static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4263 {
4264         struct p_block_ack *p = (struct p_block_ack *)h;
4265         sector_t sector = be64_to_cpu(p->sector);
4266         int blksize = be32_to_cpu(p->blksize);
4267         enum drbd_req_event what;
4268
4269         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4270
4271         if (p->block_id == ID_SYNCER) {
4272                 drbd_set_in_sync(mdev, sector, blksize);
4273                 dec_rs_pending(mdev);
4274                 return true;
4275         }
4276         switch (be16_to_cpu(h->command)) {
4277         case P_RS_WRITE_ACK:
4278                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4279                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4280                 break;
4281         case P_WRITE_ACK:
4282                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4283                 what = WRITE_ACKED_BY_PEER;
4284                 break;
4285         case P_RECV_ACK:
4286                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
4287                 what = RECV_ACKED_BY_PEER;
4288                 break;
4289         case P_DISCARD_ACK:
4290                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4291                 what = CONFLICT_DISCARDED_BY_PEER;
4292                 break;
4293         default:
4294                 D_ASSERT(0);
4295                 return false;
4296         }
4297
4298         return validate_req_change_req_state(mdev, p->block_id, sector,
4299                                              &mdev->write_requests, __func__,
4300                                              what, false);
4301 }
4302
4303 static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4304 {
4305         struct p_block_ack *p = (struct p_block_ack *)h;
4306         sector_t sector = be64_to_cpu(p->sector);
4307         int size = be32_to_cpu(p->blksize);
4308         bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4309                           mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
4310         bool found;
4311
4312         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4313
4314         if (p->block_id == ID_SYNCER) {
4315                 dec_rs_pending(mdev);
4316                 drbd_rs_failed_io(mdev, sector, size);
4317                 return true;
4318         }
4319
4320         found = validate_req_change_req_state(mdev, p->block_id, sector,
4321                                               &mdev->write_requests, __func__,
4322                                               NEG_ACKED, missing_ok);
4323         if (!found) {
4324                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4325                    The master bio might already be completed, therefore the
4326                    request is no longer in the collision hash. */
4327                 /* In Protocol B we might already have got a P_RECV_ACK
4328                    but then get a P_NEG_ACK afterwards. */
4329                 if (!missing_ok)
4330                         return false;
4331                 drbd_set_out_of_sync(mdev, sector, size);
4332         }
4333         return true;
4334 }
4335
4336 static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4337 {
4338         struct p_block_ack *p = (struct p_block_ack *)h;
4339         sector_t sector = be64_to_cpu(p->sector);
4340
4341         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4342         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4343             (unsigned long long)sector, be32_to_cpu(p->blksize));
4344
4345         return validate_req_change_req_state(mdev, p->block_id, sector,
4346                                              &mdev->read_requests, __func__,
4347                                              NEG_ACKED, false);
4348 }
4349
4350 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4351 {
4352         sector_t sector;
4353         int size;
4354         struct p_block_ack *p = (struct p_block_ack *)h;
4355
4356         sector = be64_to_cpu(p->sector);
4357         size = be32_to_cpu(p->blksize);
4358
4359         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4360
4361         dec_rs_pending(mdev);
4362
4363         if (get_ldev_if_state(mdev, D_FAILED)) {
4364                 drbd_rs_complete_io(mdev, sector);
4365                 switch (be16_to_cpu(h->command)) {
4366                 case P_NEG_RS_DREPLY:
4367                         drbd_rs_failed_io(mdev, sector, size);
4368                 case P_RS_CANCEL:
4369                         break;
4370                 default:
4371                         D_ASSERT(0);
4372                         put_ldev(mdev);
4373                         return false;
4374                 }
4375                 put_ldev(mdev);
4376         }
4377
4378         return true;
4379 }
4380
4381 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4382 {
4383         struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4384
4385         tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4386
4387         if (mdev->state.conn == C_AHEAD &&
4388             atomic_read(&mdev->ap_in_flight) == 0 &&
4389             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4390                 mdev->start_resync_timer.expires = jiffies + HZ;
4391                 add_timer(&mdev->start_resync_timer);
4392         }
4393
4394         return true;
4395 }
4396
4397 static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4398 {
4399         struct p_block_ack *p = (struct p_block_ack *)h;
4400         struct drbd_work *w;
4401         sector_t sector;
4402         int size;
4403
4404         sector = be64_to_cpu(p->sector);
4405         size = be32_to_cpu(p->blksize);
4406
4407         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4408
4409         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4410                 drbd_ov_oos_found(mdev, sector, size);
4411         else
4412                 ov_oos_print(mdev);
4413
4414         if (!get_ldev(mdev))
4415                 return true;
4416
4417         drbd_rs_complete_io(mdev, sector);
4418         dec_rs_pending(mdev);
4419
4420         --mdev->ov_left;
4421
4422         /* let's advance progress step marks only for every other megabyte */
4423         if ((mdev->ov_left & 0x200) == 0x200)
4424                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4425
4426         if (mdev->ov_left == 0) {
4427                 w = kmalloc(sizeof(*w), GFP_NOIO);
4428                 if (w) {
4429                         w->cb = w_ov_finished;
4430                         drbd_queue_work_front(&mdev->data.work, w);
4431                 } else {
4432                         dev_err(DEV, "kmalloc(w) failed.");
4433                         ov_oos_print(mdev);
4434                         drbd_resync_finished(mdev);
4435                 }
4436         }
4437         put_ldev(mdev);
4438         return true;
4439 }
4440
4441 static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4442 {
4443         return true;
4444 }
4445
4446 struct asender_cmd {
4447         size_t pkt_size;
4448         int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4449 };
4450
4451 static struct asender_cmd *get_asender_cmd(int cmd)
4452 {
4453         static struct asender_cmd asender_tbl[] = {
4454                 /* anything missing from this table is in
4455                  * the drbd_cmd_handler (drbd_default_handler) table,
4456                  * see the beginning of drbdd() */
4457         [P_PING]            = { sizeof(struct p_header80), got_Ping },
4458         [P_PING_ACK]        = { sizeof(struct p_header80), got_PingAck },
4459         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4460         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4461         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4462         [P_DISCARD_ACK]     = { sizeof(struct p_block_ack), got_BlockAck },
4463         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4464         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4465         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4466         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4467         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4468         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4469         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4470         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4471         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply},
4472         [P_MAX_CMD]         = { 0, NULL },
4473         };
4474         if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4475                 return NULL;
4476         return &asender_tbl[cmd];
4477 }
4478
4479 int drbd_asender(struct drbd_thread *thi)
4480 {
4481         struct drbd_conf *mdev = thi->mdev;
4482         struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4483         struct asender_cmd *cmd = NULL;
4484
4485         int rv, len;
4486         void *buf    = h;
4487         int received = 0;
4488         int expect   = sizeof(struct p_header80);
4489         int empty;
4490         int ping_timeout_active = 0;
4491
4492         sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4493
4494         current->policy = SCHED_RR;  /* Make this a realtime task! */
4495         current->rt_priority = 2;    /* more important than all other tasks */
4496
4497         while (get_t_state(thi) == RUNNING) {
4498                 drbd_thread_current_set_cpu(mdev);
4499                 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4500                         if (!drbd_send_ping(mdev)) {
4501                                 dev_err(DEV, "drbd_send_ping has failed\n");
4502                                 goto reconnect;
4503                         }
4504                         mdev->meta.socket->sk->sk_rcvtimeo =
4505                                 mdev->tconn->net_conf->ping_timeo*HZ/10;
4506                         ping_timeout_active = 1;
4507                 }
4508
4509                 /* conditionally cork;
4510                  * it may hurt latency if we cork without much to send */
4511                 if (!mdev->tconn->net_conf->no_cork &&
4512                         3 < atomic_read(&mdev->unacked_cnt))
4513                         drbd_tcp_cork(mdev->meta.socket);
4514                 while (1) {
4515                         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4516                         flush_signals(current);
4517                         if (!drbd_process_done_ee(mdev))
4518                                 goto reconnect;
4519                         /* to avoid race with newly queued ACKs */
4520                         set_bit(SIGNAL_ASENDER, &mdev->flags);
4521                         spin_lock_irq(&mdev->req_lock);
4522                         empty = list_empty(&mdev->done_ee);
4523                         spin_unlock_irq(&mdev->req_lock);
4524                         /* new ack may have been queued right here,
4525                          * but then there is also a signal pending,
4526                          * and we start over... */
4527                         if (empty)
4528                                 break;
4529                 }
4530                 /* but unconditionally uncork unless disabled */
4531                 if (!mdev->tconn->net_conf->no_cork)
4532                         drbd_tcp_uncork(mdev->meta.socket);
4533
4534                 /* short circuit, recv_msg would return EINTR anyways. */
4535                 if (signal_pending(current))
4536                         continue;
4537
4538                 rv = drbd_recv_short(mdev, mdev->meta.socket,
4539                                      buf, expect-received, 0);
4540                 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4541
4542                 flush_signals(current);
4543
4544                 /* Note:
4545                  * -EINTR        (on meta) we got a signal
4546                  * -EAGAIN       (on meta) rcvtimeo expired
4547                  * -ECONNRESET   other side closed the connection
4548                  * -ERESTARTSYS  (on data) we got a signal
4549                  * rv <  0       other than above: unexpected error!
4550                  * rv == expected: full header or command
4551                  * rv <  expected: "woken" by signal during receive
4552                  * rv == 0       : "connection shut down by peer"
4553                  */
4554                 if (likely(rv > 0)) {
4555                         received += rv;
4556                         buf      += rv;
4557                 } else if (rv == 0) {
4558                         dev_err(DEV, "meta connection shut down by peer.\n");
4559                         goto reconnect;
4560                 } else if (rv == -EAGAIN) {
4561                         /* If the data socket received something meanwhile,
4562                          * that is good enough: peer is still alive. */
4563                         if (time_after(mdev->last_received,
4564                                 jiffies - mdev->meta.socket->sk->sk_rcvtimeo))
4565                                 continue;
4566                         if (ping_timeout_active) {
4567                                 dev_err(DEV, "PingAck did not arrive in time.\n");
4568                                 goto reconnect;
4569                         }
4570                         set_bit(SEND_PING, &mdev->flags);
4571                         continue;
4572                 } else if (rv == -EINTR) {
4573                         continue;
4574                 } else {
4575                         dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4576                         goto reconnect;
4577                 }
4578
4579                 if (received == expect && cmd == NULL) {
4580                         if (unlikely(h->magic != cpu_to_be32(DRBD_MAGIC))) {
4581                                 dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4582                                     be32_to_cpu(h->magic),
4583                                     be16_to_cpu(h->command),
4584                                     be16_to_cpu(h->length));
4585                                 goto reconnect;
4586                         }
4587                         cmd = get_asender_cmd(be16_to_cpu(h->command));
4588                         len = be16_to_cpu(h->length);
4589                         if (unlikely(cmd == NULL)) {
4590                                 dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4591                                     be32_to_cpu(h->magic),
4592                                     be16_to_cpu(h->command),
4593                                     be16_to_cpu(h->length));
4594                                 goto disconnect;
4595                         }
4596                         expect = cmd->pkt_size;
4597                         if (!expect(len == expect - sizeof(struct p_header80)))
4598                                 goto reconnect;
4599                 }
4600                 if (received == expect) {
4601                         mdev->last_received = jiffies;
4602                         D_ASSERT(cmd != NULL);
4603                         if (!cmd->process(mdev, h))
4604                                 goto reconnect;
4605
4606                         /* the idle_timeout (ping-int)
4607                          * has been restored in got_PingAck() */
4608                         if (cmd == get_asender_cmd(P_PING_ACK))
4609                                 ping_timeout_active = 0;
4610
4611                         buf      = h;
4612                         received = 0;
4613                         expect   = sizeof(struct p_header80);
4614                         cmd      = NULL;
4615                 }
4616         }
4617
4618         if (0) {
4619 reconnect:
4620                 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4621                 drbd_md_sync(mdev);
4622         }
4623         if (0) {
4624 disconnect:
4625                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4626                 drbd_md_sync(mdev);
4627         }
4628         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4629
4630         D_ASSERT(mdev->state.conn < C_CONNECTED);
4631         dev_info(DEV, "asender terminated\n");
4632
4633         return 0;
4634 }