]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
aa6c52c80e2ecaf6b99af47e3a704fa313c74c40
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 enum finish_epoch {
52         FE_STILL_LIVE,
53         FE_DESTROYED,
54         FE_RECYCLED,
55 };
56
57 static int drbd_do_handshake(struct drbd_conf *mdev);
58 static int drbd_do_auth(struct drbd_conf *mdev);
59
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62
63
64 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
65
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
75 static struct page *page_chain_del(struct page **head, int n)
76 {
77         struct page *page;
78         struct page *tmp;
79
80         BUG_ON(!n);
81         BUG_ON(!head);
82
83         page = *head;
84
85         if (!page)
86                 return NULL;
87
88         while (page) {
89                 tmp = page_chain_next(page);
90                 if (--n == 0)
91                         break; /* found sufficient pages */
92                 if (tmp == NULL)
93                         /* insufficient pages, don't use any of them. */
94                         return NULL;
95                 page = tmp;
96         }
97
98         /* add end of list marker for the returned list */
99         set_page_private(page, 0);
100         /* actual return value, and adjustment of head */
101         page = *head;
102         *head = tmp;
103         return page;
104 }
105
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111         struct page *tmp;
112         int i = 1;
113         while ((tmp = page_chain_next(page)))
114                 ++i, page = tmp;
115         if (len)
116                 *len = i;
117         return page;
118 }
119
120 static int page_chain_free(struct page *page)
121 {
122         struct page *tmp;
123         int i = 0;
124         page_chain_for_each_safe(page, tmp) {
125                 put_page(page);
126                 ++i;
127         }
128         return i;
129 }
130
131 static void page_chain_add(struct page **head,
132                 struct page *chain_first, struct page *chain_last)
133 {
134 #if 1
135         struct page *tmp;
136         tmp = page_chain_tail(chain_first, NULL);
137         BUG_ON(tmp != chain_last);
138 #endif
139
140         /* add chain to head */
141         set_page_private(chain_last, (unsigned long)*head);
142         *head = chain_first;
143 }
144
145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
146 {
147         struct page *page = NULL;
148         struct page *tmp = NULL;
149         int i = 0;
150
151         /* Yes, testing drbd_pp_vacant outside the lock is racy.
152          * So what. It saves a spin_lock. */
153         if (drbd_pp_vacant >= number) {
154                 spin_lock(&drbd_pp_lock);
155                 page = page_chain_del(&drbd_pp_pool, number);
156                 if (page)
157                         drbd_pp_vacant -= number;
158                 spin_unlock(&drbd_pp_lock);
159                 if (page)
160                         return page;
161         }
162
163         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164          * "criss-cross" setup, that might cause write-out on some other DRBD,
165          * which in turn might block on the other node at this very place.  */
166         for (i = 0; i < number; i++) {
167                 tmp = alloc_page(GFP_TRY);
168                 if (!tmp)
169                         break;
170                 set_page_private(tmp, (unsigned long)page);
171                 page = tmp;
172         }
173
174         if (i == number)
175                 return page;
176
177         /* Not enough pages immediately available this time.
178          * No need to jump around here, drbd_pp_alloc will retry this
179          * function "soon". */
180         if (page) {
181                 tmp = page_chain_tail(page, NULL);
182                 spin_lock(&drbd_pp_lock);
183                 page_chain_add(&drbd_pp_pool, page, tmp);
184                 drbd_pp_vacant += i;
185                 spin_unlock(&drbd_pp_lock);
186         }
187         return NULL;
188 }
189
190 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191 {
192         struct drbd_epoch_entry *e;
193         struct list_head *le, *tle;
194
195         /* The EEs are always appended to the end of the list. Since
196            they are sent in order over the wire, they have to finish
197            in order. As soon as we see the first not finished we can
198            stop to examine the list... */
199
200         list_for_each_safe(le, tle, &mdev->net_ee) {
201                 e = list_entry(le, struct drbd_epoch_entry, w.list);
202                 if (drbd_ee_has_active_page(e))
203                         break;
204                 list_move(le, to_be_freed);
205         }
206 }
207
208 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209 {
210         LIST_HEAD(reclaimed);
211         struct drbd_epoch_entry *e, *t;
212
213         spin_lock_irq(&mdev->req_lock);
214         reclaim_net_ee(mdev, &reclaimed);
215         spin_unlock_irq(&mdev->req_lock);
216
217         list_for_each_entry_safe(e, t, &reclaimed, w.list)
218                 drbd_free_net_ee(mdev, e);
219 }
220
221 /**
222  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
223  * @mdev:       DRBD device.
224  * @number:     number of pages requested
225  * @retry:      whether to retry, if not enough pages are available right now
226  *
227  * Tries to allocate number pages, first from our own page pool, then from
228  * the kernel, unless this allocation would exceed the max_buffers setting.
229  * Possibly retry until DRBD frees sufficient pages somewhere else.
230  *
231  * Returns a page chain linked via page->private.
232  */
233 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
234 {
235         struct page *page = NULL;
236         DEFINE_WAIT(wait);
237
238         /* Yes, we may run up to @number over max_buffers. If we
239          * follow it strictly, the admin will get it wrong anyways. */
240         if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
241                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
242
243         while (page == NULL) {
244                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
245
246                 drbd_kick_lo_and_reclaim_net(mdev);
247
248                 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
249                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250                         if (page)
251                                 break;
252                 }
253
254                 if (!retry)
255                         break;
256
257                 if (signal_pending(current)) {
258                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259                         break;
260                 }
261
262                 schedule();
263         }
264         finish_wait(&drbd_pp_wait, &wait);
265
266         if (page)
267                 atomic_add(number, &mdev->pp_in_use);
268         return page;
269 }
270
271 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
272  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
273  * Either links the page chain back to the global pool,
274  * or returns all pages to the system. */
275 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
276 {
277         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
278         int i;
279
280         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
281                 i = page_chain_free(page);
282         else {
283                 struct page *tmp;
284                 tmp = page_chain_tail(page, &i);
285                 spin_lock(&drbd_pp_lock);
286                 page_chain_add(&drbd_pp_pool, page, tmp);
287                 drbd_pp_vacant += i;
288                 spin_unlock(&drbd_pp_lock);
289         }
290         i = atomic_sub_return(i, a);
291         if (i < 0)
292                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
293                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
294         wake_up(&drbd_pp_wait);
295 }
296
297 /*
298 You need to hold the req_lock:
299  _drbd_wait_ee_list_empty()
300
301 You must not have the req_lock:
302  drbd_free_ee()
303  drbd_alloc_ee()
304  drbd_init_ee()
305  drbd_release_ee()
306  drbd_ee_fix_bhs()
307  drbd_process_done_ee()
308  drbd_clear_done_ee()
309  drbd_wait_ee_list_empty()
310 */
311
312 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
313                                      u64 id,
314                                      sector_t sector,
315                                      unsigned int data_size,
316                                      gfp_t gfp_mask) __must_hold(local)
317 {
318         struct drbd_epoch_entry *e;
319         struct page *page;
320         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
321
322         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
323                 return NULL;
324
325         e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
326         if (!e) {
327                 if (!(gfp_mask & __GFP_NOWARN))
328                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
329                 return NULL;
330         }
331
332         page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
333         if (!page)
334                 goto fail;
335
336         INIT_HLIST_NODE(&e->collision);
337         e->epoch = NULL;
338         e->mdev = mdev;
339         e->pages = page;
340         atomic_set(&e->pending_bios, 0);
341         e->size = data_size;
342         e->flags = 0;
343         e->sector = sector;
344         e->block_id = id;
345
346         return e;
347
348  fail:
349         mempool_free(e, drbd_ee_mempool);
350         return NULL;
351 }
352
353 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
354 {
355         if (e->flags & EE_HAS_DIGEST)
356                 kfree(e->digest);
357         drbd_pp_free(mdev, e->pages, is_net);
358         D_ASSERT(atomic_read(&e->pending_bios) == 0);
359         D_ASSERT(hlist_unhashed(&e->collision));
360         mempool_free(e, drbd_ee_mempool);
361 }
362
363 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
364 {
365         LIST_HEAD(work_list);
366         struct drbd_epoch_entry *e, *t;
367         int count = 0;
368         int is_net = list == &mdev->net_ee;
369
370         spin_lock_irq(&mdev->req_lock);
371         list_splice_init(list, &work_list);
372         spin_unlock_irq(&mdev->req_lock);
373
374         list_for_each_entry_safe(e, t, &work_list, w.list) {
375                 drbd_free_some_ee(mdev, e, is_net);
376                 count++;
377         }
378         return count;
379 }
380
381
382 /*
383  * This function is called from _asender only_
384  * but see also comments in _req_mod(,barrier_acked)
385  * and receive_Barrier.
386  *
387  * Move entries from net_ee to done_ee, if ready.
388  * Grab done_ee, call all callbacks, free the entries.
389  * The callbacks typically send out ACKs.
390  */
391 static int drbd_process_done_ee(struct drbd_conf *mdev)
392 {
393         LIST_HEAD(work_list);
394         LIST_HEAD(reclaimed);
395         struct drbd_epoch_entry *e, *t;
396         int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
397
398         spin_lock_irq(&mdev->req_lock);
399         reclaim_net_ee(mdev, &reclaimed);
400         list_splice_init(&mdev->done_ee, &work_list);
401         spin_unlock_irq(&mdev->req_lock);
402
403         list_for_each_entry_safe(e, t, &reclaimed, w.list)
404                 drbd_free_net_ee(mdev, e);
405
406         /* possible callbacks here:
407          * e_end_block, and e_end_resync_block, e_send_discard_ack.
408          * all ignore the last argument.
409          */
410         list_for_each_entry_safe(e, t, &work_list, w.list) {
411                 /* list_del not necessary, next/prev members not touched */
412                 ok = e->w.cb(mdev, &e->w, !ok) && ok;
413                 drbd_free_ee(mdev, e);
414         }
415         wake_up(&mdev->ee_wait);
416
417         return ok;
418 }
419
420 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
421 {
422         DEFINE_WAIT(wait);
423
424         /* avoids spin_lock/unlock
425          * and calling prepare_to_wait in the fast path */
426         while (!list_empty(head)) {
427                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
428                 spin_unlock_irq(&mdev->req_lock);
429                 io_schedule();
430                 finish_wait(&mdev->ee_wait, &wait);
431                 spin_lock_irq(&mdev->req_lock);
432         }
433 }
434
435 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
436 {
437         spin_lock_irq(&mdev->req_lock);
438         _drbd_wait_ee_list_empty(mdev, head);
439         spin_unlock_irq(&mdev->req_lock);
440 }
441
442 /* see also kernel_accept; which is only present since 2.6.18.
443  * also we want to log which part of it failed, exactly */
444 static int drbd_accept(struct drbd_conf *mdev, const char **what,
445                 struct socket *sock, struct socket **newsock)
446 {
447         struct sock *sk = sock->sk;
448         int err = 0;
449
450         *what = "listen";
451         err = sock->ops->listen(sock, 5);
452         if (err < 0)
453                 goto out;
454
455         *what = "sock_create_lite";
456         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
457                                newsock);
458         if (err < 0)
459                 goto out;
460
461         *what = "accept";
462         err = sock->ops->accept(sock, *newsock, 0);
463         if (err < 0) {
464                 sock_release(*newsock);
465                 *newsock = NULL;
466                 goto out;
467         }
468         (*newsock)->ops  = sock->ops;
469
470 out:
471         return err;
472 }
473
474 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
475                     void *buf, size_t size, int flags)
476 {
477         mm_segment_t oldfs;
478         struct kvec iov = {
479                 .iov_base = buf,
480                 .iov_len = size,
481         };
482         struct msghdr msg = {
483                 .msg_iovlen = 1,
484                 .msg_iov = (struct iovec *)&iov,
485                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
486         };
487         int rv;
488
489         oldfs = get_fs();
490         set_fs(KERNEL_DS);
491         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
492         set_fs(oldfs);
493
494         return rv;
495 }
496
497 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
498 {
499         mm_segment_t oldfs;
500         struct kvec iov = {
501                 .iov_base = buf,
502                 .iov_len = size,
503         };
504         struct msghdr msg = {
505                 .msg_iovlen = 1,
506                 .msg_iov = (struct iovec *)&iov,
507                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
508         };
509         int rv;
510
511         oldfs = get_fs();
512         set_fs(KERNEL_DS);
513
514         for (;;) {
515                 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
516                 if (rv == size)
517                         break;
518
519                 /* Note:
520                  * ECONNRESET   other side closed the connection
521                  * ERESTARTSYS  (on  sock) we got a signal
522                  */
523
524                 if (rv < 0) {
525                         if (rv == -ECONNRESET)
526                                 dev_info(DEV, "sock was reset by peer\n");
527                         else if (rv != -ERESTARTSYS)
528                                 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
529                         break;
530                 } else if (rv == 0) {
531                         dev_info(DEV, "sock was shut down by peer\n");
532                         break;
533                 } else  {
534                         /* signal came in, or peer/link went down,
535                          * after we read a partial message
536                          */
537                         /* D_ASSERT(signal_pending(current)); */
538                         break;
539                 }
540         };
541
542         set_fs(oldfs);
543
544         if (rv != size)
545                 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
546
547         return rv;
548 }
549
550 /* quoting tcp(7):
551  *   On individual connections, the socket buffer size must be set prior to the
552  *   listen(2) or connect(2) calls in order to have it take effect.
553  * This is our wrapper to do so.
554  */
555 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
556                 unsigned int rcv)
557 {
558         /* open coded SO_SNDBUF, SO_RCVBUF */
559         if (snd) {
560                 sock->sk->sk_sndbuf = snd;
561                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
562         }
563         if (rcv) {
564                 sock->sk->sk_rcvbuf = rcv;
565                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
566         }
567 }
568
569 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
570 {
571         const char *what;
572         struct socket *sock;
573         struct sockaddr_in6 src_in6;
574         int err;
575         int disconnect_on_error = 1;
576
577         if (!get_net_conf(mdev))
578                 return NULL;
579
580         what = "sock_create_kern";
581         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
582                 SOCK_STREAM, IPPROTO_TCP, &sock);
583         if (err < 0) {
584                 sock = NULL;
585                 goto out;
586         }
587
588         sock->sk->sk_rcvtimeo =
589         sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
590         drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
591                         mdev->net_conf->rcvbuf_size);
592
593        /* explicitly bind to the configured IP as source IP
594         *  for the outgoing connections.
595         *  This is needed for multihomed hosts and to be
596         *  able to use lo: interfaces for drbd.
597         * Make sure to use 0 as port number, so linux selects
598         *  a free one dynamically.
599         */
600         memcpy(&src_in6, mdev->net_conf->my_addr,
601                min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
602         if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
603                 src_in6.sin6_port = 0;
604         else
605                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
606
607         what = "bind before connect";
608         err = sock->ops->bind(sock,
609                               (struct sockaddr *) &src_in6,
610                               mdev->net_conf->my_addr_len);
611         if (err < 0)
612                 goto out;
613
614         /* connect may fail, peer not yet available.
615          * stay C_WF_CONNECTION, don't go Disconnecting! */
616         disconnect_on_error = 0;
617         what = "connect";
618         err = sock->ops->connect(sock,
619                                  (struct sockaddr *)mdev->net_conf->peer_addr,
620                                  mdev->net_conf->peer_addr_len, 0);
621
622 out:
623         if (err < 0) {
624                 if (sock) {
625                         sock_release(sock);
626                         sock = NULL;
627                 }
628                 switch (-err) {
629                         /* timeout, busy, signal pending */
630                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
631                 case EINTR: case ERESTARTSYS:
632                         /* peer not (yet) available, network problem */
633                 case ECONNREFUSED: case ENETUNREACH:
634                 case EHOSTDOWN:    case EHOSTUNREACH:
635                         disconnect_on_error = 0;
636                         break;
637                 default:
638                         dev_err(DEV, "%s failed, err = %d\n", what, err);
639                 }
640                 if (disconnect_on_error)
641                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
642         }
643         put_net_conf(mdev);
644         return sock;
645 }
646
647 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
648 {
649         int timeo, err;
650         struct socket *s_estab = NULL, *s_listen;
651         const char *what;
652
653         if (!get_net_conf(mdev))
654                 return NULL;
655
656         what = "sock_create_kern";
657         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
658                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
659         if (err) {
660                 s_listen = NULL;
661                 goto out;
662         }
663
664         timeo = mdev->net_conf->try_connect_int * HZ;
665         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
666
667         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
668         s_listen->sk->sk_rcvtimeo = timeo;
669         s_listen->sk->sk_sndtimeo = timeo;
670         drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
671                         mdev->net_conf->rcvbuf_size);
672
673         what = "bind before listen";
674         err = s_listen->ops->bind(s_listen,
675                               (struct sockaddr *) mdev->net_conf->my_addr,
676                               mdev->net_conf->my_addr_len);
677         if (err < 0)
678                 goto out;
679
680         err = drbd_accept(mdev, &what, s_listen, &s_estab);
681
682 out:
683         if (s_listen)
684                 sock_release(s_listen);
685         if (err < 0) {
686                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
687                         dev_err(DEV, "%s failed, err = %d\n", what, err);
688                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
689                 }
690         }
691         put_net_conf(mdev);
692
693         return s_estab;
694 }
695
696 static int drbd_send_fp(struct drbd_conf *mdev,
697         struct socket *sock, enum drbd_packets cmd)
698 {
699         struct p_header80 *h = &mdev->data.sbuf.header.h80;
700
701         return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
702 }
703
704 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
705 {
706         struct p_header80 *h = &mdev->data.rbuf.header.h80;
707         int rr;
708
709         rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
710
711         if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
712                 return be16_to_cpu(h->command);
713
714         return 0xffff;
715 }
716
717 /**
718  * drbd_socket_okay() - Free the socket if its connection is not okay
719  * @mdev:       DRBD device.
720  * @sock:       pointer to the pointer to the socket.
721  */
722 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
723 {
724         int rr;
725         char tb[4];
726
727         if (!*sock)
728                 return false;
729
730         rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
731
732         if (rr > 0 || rr == -EAGAIN) {
733                 return true;
734         } else {
735                 sock_release(*sock);
736                 *sock = NULL;
737                 return false;
738         }
739 }
740
741 /*
742  * return values:
743  *   1 yes, we have a valid connection
744  *   0 oops, did not work out, please try again
745  *  -1 peer talks different language,
746  *     no point in trying again, please go standalone.
747  *  -2 We do not have a network config...
748  */
749 static int drbd_connect(struct drbd_conf *mdev)
750 {
751         struct socket *s, *sock, *msock;
752         int try, h, ok;
753
754         D_ASSERT(!mdev->data.socket);
755
756         if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
757                 return -2;
758
759         clear_bit(DISCARD_CONCURRENT, &mdev->flags);
760
761         sock  = NULL;
762         msock = NULL;
763
764         do {
765                 for (try = 0;;) {
766                         /* 3 tries, this should take less than a second! */
767                         s = drbd_try_connect(mdev);
768                         if (s || ++try >= 3)
769                                 break;
770                         /* give the other side time to call bind() & listen() */
771                         schedule_timeout_interruptible(HZ / 10);
772                 }
773
774                 if (s) {
775                         if (!sock) {
776                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
777                                 sock = s;
778                                 s = NULL;
779                         } else if (!msock) {
780                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
781                                 msock = s;
782                                 s = NULL;
783                         } else {
784                                 dev_err(DEV, "Logic error in drbd_connect()\n");
785                                 goto out_release_sockets;
786                         }
787                 }
788
789                 if (sock && msock) {
790                         schedule_timeout_interruptible(mdev->net_conf->ping_timeo*HZ/10);
791                         ok = drbd_socket_okay(mdev, &sock);
792                         ok = drbd_socket_okay(mdev, &msock) && ok;
793                         if (ok)
794                                 break;
795                 }
796
797 retry:
798                 s = drbd_wait_for_connect(mdev);
799                 if (s) {
800                         try = drbd_recv_fp(mdev, s);
801                         drbd_socket_okay(mdev, &sock);
802                         drbd_socket_okay(mdev, &msock);
803                         switch (try) {
804                         case P_HAND_SHAKE_S:
805                                 if (sock) {
806                                         dev_warn(DEV, "initial packet S crossed\n");
807                                         sock_release(sock);
808                                 }
809                                 sock = s;
810                                 break;
811                         case P_HAND_SHAKE_M:
812                                 if (msock) {
813                                         dev_warn(DEV, "initial packet M crossed\n");
814                                         sock_release(msock);
815                                 }
816                                 msock = s;
817                                 set_bit(DISCARD_CONCURRENT, &mdev->flags);
818                                 break;
819                         default:
820                                 dev_warn(DEV, "Error receiving initial packet\n");
821                                 sock_release(s);
822                                 if (random32() & 1)
823                                         goto retry;
824                         }
825                 }
826
827                 if (mdev->state.conn <= C_DISCONNECTING)
828                         goto out_release_sockets;
829                 if (signal_pending(current)) {
830                         flush_signals(current);
831                         smp_rmb();
832                         if (get_t_state(&mdev->receiver) == Exiting)
833                                 goto out_release_sockets;
834                 }
835
836                 if (sock && msock) {
837                         ok = drbd_socket_okay(mdev, &sock);
838                         ok = drbd_socket_okay(mdev, &msock) && ok;
839                         if (ok)
840                                 break;
841                 }
842         } while (1);
843
844         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
845         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
846
847         sock->sk->sk_allocation = GFP_NOIO;
848         msock->sk->sk_allocation = GFP_NOIO;
849
850         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
851         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
852
853         /* NOT YET ...
854          * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
855          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
856          * first set it to the P_HAND_SHAKE timeout,
857          * which we set to 4x the configured ping_timeout. */
858         sock->sk->sk_sndtimeo =
859         sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
860
861         msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862         msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
863
864         /* we don't want delays.
865          * we use TCP_CORK where appropriate, though */
866         drbd_tcp_nodelay(sock);
867         drbd_tcp_nodelay(msock);
868
869         mdev->data.socket = sock;
870         mdev->meta.socket = msock;
871         mdev->last_received = jiffies;
872
873         D_ASSERT(mdev->asender.task == NULL);
874
875         h = drbd_do_handshake(mdev);
876         if (h <= 0)
877                 return h;
878
879         if (mdev->cram_hmac_tfm) {
880                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
881                 switch (drbd_do_auth(mdev)) {
882                 case -1:
883                         dev_err(DEV, "Authentication of peer failed\n");
884                         return -1;
885                 case 0:
886                         dev_err(DEV, "Authentication of peer failed, trying again.\n");
887                         return 0;
888                 }
889         }
890
891         sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
892         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
893
894         atomic_set(&mdev->packet_seq, 0);
895         mdev->peer_seq = 0;
896
897         if (drbd_send_protocol(mdev) == -1)
898                 return -1;
899         drbd_send_sync_param(mdev, &mdev->sync_conf);
900         drbd_send_sizes(mdev, 0, 0);
901         drbd_send_uuids(mdev);
902         drbd_send_state(mdev);
903         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
904         clear_bit(RESIZE_PENDING, &mdev->flags);
905
906         if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
907                 return 0;
908
909         drbd_thread_start(&mdev->asender);
910         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
911
912         return 1;
913
914 out_release_sockets:
915         if (sock)
916                 sock_release(sock);
917         if (msock)
918                 sock_release(msock);
919         return -1;
920 }
921
922 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
923 {
924         union p_header *h = &mdev->data.rbuf.header;
925         int r;
926
927         r = drbd_recv(mdev, h, sizeof(*h));
928         if (unlikely(r != sizeof(*h))) {
929                 if (!signal_pending(current))
930                         dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
931                 return false;
932         }
933
934         if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
935                 *cmd = be16_to_cpu(h->h80.command);
936                 *packet_size = be16_to_cpu(h->h80.length);
937         } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
938                 *cmd = be16_to_cpu(h->h95.command);
939                 *packet_size = be32_to_cpu(h->h95.length);
940         } else {
941                 dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
942                     be32_to_cpu(h->h80.magic),
943                     be16_to_cpu(h->h80.command),
944                     be16_to_cpu(h->h80.length));
945                 return false;
946         }
947         mdev->last_received = jiffies;
948
949         return true;
950 }
951
952 static void drbd_flush(struct drbd_conf *mdev)
953 {
954         int rv;
955
956         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
957                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
958                                         NULL);
959                 if (rv) {
960                         dev_info(DEV, "local disk flush failed with status %d\n", rv);
961                         /* would rather check on EOPNOTSUPP, but that is not reliable.
962                          * don't try again for ANY return value != 0
963                          * if (rv == -EOPNOTSUPP) */
964                         drbd_bump_write_ordering(mdev, WO_drain_io);
965                 }
966                 put_ldev(mdev);
967         }
968 }
969
970 /**
971  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
972  * @mdev:       DRBD device.
973  * @epoch:      Epoch object.
974  * @ev:         Epoch event.
975  */
976 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
977                                                struct drbd_epoch *epoch,
978                                                enum epoch_event ev)
979 {
980         int epoch_size;
981         struct drbd_epoch *next_epoch;
982         enum finish_epoch rv = FE_STILL_LIVE;
983
984         spin_lock(&mdev->epoch_lock);
985         do {
986                 next_epoch = NULL;
987
988                 epoch_size = atomic_read(&epoch->epoch_size);
989
990                 switch (ev & ~EV_CLEANUP) {
991                 case EV_PUT:
992                         atomic_dec(&epoch->active);
993                         break;
994                 case EV_GOT_BARRIER_NR:
995                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
996                         break;
997                 case EV_BECAME_LAST:
998                         /* nothing to do*/
999                         break;
1000                 }
1001
1002                 if (epoch_size != 0 &&
1003                     atomic_read(&epoch->active) == 0 &&
1004                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1005                         if (!(ev & EV_CLEANUP)) {
1006                                 spin_unlock(&mdev->epoch_lock);
1007                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1008                                 spin_lock(&mdev->epoch_lock);
1009                         }
1010                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1011                                 dec_unacked(mdev);
1012
1013                         if (mdev->current_epoch != epoch) {
1014                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1015                                 list_del(&epoch->list);
1016                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1017                                 mdev->epochs--;
1018                                 kfree(epoch);
1019
1020                                 if (rv == FE_STILL_LIVE)
1021                                         rv = FE_DESTROYED;
1022                         } else {
1023                                 epoch->flags = 0;
1024                                 atomic_set(&epoch->epoch_size, 0);
1025                                 /* atomic_set(&epoch->active, 0); is already zero */
1026                                 if (rv == FE_STILL_LIVE)
1027                                         rv = FE_RECYCLED;
1028                                 wake_up(&mdev->ee_wait);
1029                         }
1030                 }
1031
1032                 if (!next_epoch)
1033                         break;
1034
1035                 epoch = next_epoch;
1036         } while (1);
1037
1038         spin_unlock(&mdev->epoch_lock);
1039
1040         return rv;
1041 }
1042
1043 /**
1044  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1045  * @mdev:       DRBD device.
1046  * @wo:         Write ordering method to try.
1047  */
1048 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1049 {
1050         enum write_ordering_e pwo;
1051         static char *write_ordering_str[] = {
1052                 [WO_none] = "none",
1053                 [WO_drain_io] = "drain",
1054                 [WO_bdev_flush] = "flush",
1055         };
1056
1057         pwo = mdev->write_ordering;
1058         wo = min(pwo, wo);
1059         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1060                 wo = WO_drain_io;
1061         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1062                 wo = WO_none;
1063         mdev->write_ordering = wo;
1064         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1065                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1066 }
1067
1068 /**
1069  * drbd_submit_ee()
1070  * @mdev:       DRBD device.
1071  * @e:          epoch entry
1072  * @rw:         flag field, see bio->bi_rw
1073  *
1074  * May spread the pages to multiple bios,
1075  * depending on bio_add_page restrictions.
1076  *
1077  * Returns 0 if all bios have been submitted,
1078  * -ENOMEM if we could not allocate enough bios,
1079  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1080  *  single page to an empty bio (which should never happen and likely indicates
1081  *  that the lower level IO stack is in some way broken). This has been observed
1082  *  on certain Xen deployments.
1083  */
1084 /* TODO allocate from our own bio_set. */
1085 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1086                 const unsigned rw, const int fault_type)
1087 {
1088         struct bio *bios = NULL;
1089         struct bio *bio;
1090         struct page *page = e->pages;
1091         sector_t sector = e->sector;
1092         unsigned ds = e->size;
1093         unsigned n_bios = 0;
1094         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1095         int err = -ENOMEM;
1096
1097         /* In most cases, we will only need one bio.  But in case the lower
1098          * level restrictions happen to be different at this offset on this
1099          * side than those of the sending peer, we may need to submit the
1100          * request in more than one bio. */
1101 next_bio:
1102         bio = bio_alloc(GFP_NOIO, nr_pages);
1103         if (!bio) {
1104                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1105                 goto fail;
1106         }
1107         /* > e->sector, unless this is the first bio */
1108         bio->bi_sector = sector;
1109         bio->bi_bdev = mdev->ldev->backing_bdev;
1110         bio->bi_rw = rw;
1111         bio->bi_private = e;
1112         bio->bi_end_io = drbd_endio_sec;
1113
1114         bio->bi_next = bios;
1115         bios = bio;
1116         ++n_bios;
1117
1118         page_chain_for_each(page) {
1119                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1120                 if (!bio_add_page(bio, page, len, 0)) {
1121                         /* A single page must always be possible!
1122                          * But in case it fails anyways,
1123                          * we deal with it, and complain (below). */
1124                         if (bio->bi_vcnt == 0) {
1125                                 dev_err(DEV,
1126                                         "bio_add_page failed for len=%u, "
1127                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1128                                         len, (unsigned long long)bio->bi_sector);
1129                                 err = -ENOSPC;
1130                                 goto fail;
1131                         }
1132                         goto next_bio;
1133                 }
1134                 ds -= len;
1135                 sector += len >> 9;
1136                 --nr_pages;
1137         }
1138         D_ASSERT(page == NULL);
1139         D_ASSERT(ds == 0);
1140
1141         atomic_set(&e->pending_bios, n_bios);
1142         do {
1143                 bio = bios;
1144                 bios = bios->bi_next;
1145                 bio->bi_next = NULL;
1146
1147                 drbd_generic_make_request(mdev, fault_type, bio);
1148         } while (bios);
1149         return 0;
1150
1151 fail:
1152         while (bios) {
1153                 bio = bios;
1154                 bios = bios->bi_next;
1155                 bio_put(bio);
1156         }
1157         return err;
1158 }
1159
1160 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1161 {
1162         int rv;
1163         struct p_barrier *p = &mdev->data.rbuf.barrier;
1164         struct drbd_epoch *epoch;
1165
1166         inc_unacked(mdev);
1167
1168         mdev->current_epoch->barrier_nr = p->barrier;
1169         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1170
1171         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1172          * the activity log, which means it would not be resynced in case the
1173          * R_PRIMARY crashes now.
1174          * Therefore we must send the barrier_ack after the barrier request was
1175          * completed. */
1176         switch (mdev->write_ordering) {
1177         case WO_none:
1178                 if (rv == FE_RECYCLED)
1179                         return true;
1180
1181                 /* receiver context, in the writeout path of the other node.
1182                  * avoid potential distributed deadlock */
1183                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1184                 if (epoch)
1185                         break;
1186                 else
1187                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1188                         /* Fall through */
1189
1190         case WO_bdev_flush:
1191         case WO_drain_io:
1192                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1193                 drbd_flush(mdev);
1194
1195                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1196                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1197                         if (epoch)
1198                                 break;
1199                 }
1200
1201                 epoch = mdev->current_epoch;
1202                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1203
1204                 D_ASSERT(atomic_read(&epoch->active) == 0);
1205                 D_ASSERT(epoch->flags == 0);
1206
1207                 return true;
1208         default:
1209                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1210                 return false;
1211         }
1212
1213         epoch->flags = 0;
1214         atomic_set(&epoch->epoch_size, 0);
1215         atomic_set(&epoch->active, 0);
1216
1217         spin_lock(&mdev->epoch_lock);
1218         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1219                 list_add(&epoch->list, &mdev->current_epoch->list);
1220                 mdev->current_epoch = epoch;
1221                 mdev->epochs++;
1222         } else {
1223                 /* The current_epoch got recycled while we allocated this one... */
1224                 kfree(epoch);
1225         }
1226         spin_unlock(&mdev->epoch_lock);
1227
1228         return true;
1229 }
1230
1231 /* used from receive_RSDataReply (recv_resync_read)
1232  * and from receive_Data */
1233 static struct drbd_epoch_entry *
1234 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1235 {
1236         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1237         struct drbd_epoch_entry *e;
1238         struct page *page;
1239         int dgs, ds, rr;
1240         void *dig_in = mdev->int_dig_in;
1241         void *dig_vv = mdev->int_dig_vv;
1242         unsigned long *data;
1243
1244         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1245                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1246
1247         if (dgs) {
1248                 rr = drbd_recv(mdev, dig_in, dgs);
1249                 if (rr != dgs) {
1250                         if (!signal_pending(current))
1251                                 dev_warn(DEV,
1252                                         "short read receiving data digest: read %d expected %d\n",
1253                                         rr, dgs);
1254                         return NULL;
1255                 }
1256         }
1257
1258         data_size -= dgs;
1259
1260         ERR_IF(data_size == 0) return NULL;
1261         ERR_IF(data_size &  0x1ff) return NULL;
1262         ERR_IF(data_size >  DRBD_MAX_BIO_SIZE) return NULL;
1263
1264         /* even though we trust out peer,
1265          * we sometimes have to double check. */
1266         if (sector + (data_size>>9) > capacity) {
1267                 dev_err(DEV, "request from peer beyond end of local disk: "
1268                         "capacity: %llus < sector: %llus + size: %u\n",
1269                         (unsigned long long)capacity,
1270                         (unsigned long long)sector, data_size);
1271                 return NULL;
1272         }
1273
1274         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1275          * "criss-cross" setup, that might cause write-out on some other DRBD,
1276          * which in turn might block on the other node at this very place.  */
1277         e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1278         if (!e)
1279                 return NULL;
1280
1281         ds = data_size;
1282         page = e->pages;
1283         page_chain_for_each(page) {
1284                 unsigned len = min_t(int, ds, PAGE_SIZE);
1285                 data = kmap(page);
1286                 rr = drbd_recv(mdev, data, len);
1287                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1288                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1289                         data[0] = data[0] ^ (unsigned long)-1;
1290                 }
1291                 kunmap(page);
1292                 if (rr != len) {
1293                         drbd_free_ee(mdev, e);
1294                         if (!signal_pending(current))
1295                                 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1296                                 rr, len);
1297                         return NULL;
1298                 }
1299                 ds -= rr;
1300         }
1301
1302         if (dgs) {
1303                 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1304                 if (memcmp(dig_in, dig_vv, dgs)) {
1305                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1306                                 (unsigned long long)sector, data_size);
1307                         drbd_bcast_ee(mdev, "digest failed",
1308                                         dgs, dig_in, dig_vv, e);
1309                         drbd_free_ee(mdev, e);
1310                         return NULL;
1311                 }
1312         }
1313         mdev->recv_cnt += data_size>>9;
1314         return e;
1315 }
1316
1317 /* drbd_drain_block() just takes a data block
1318  * out of the socket input buffer, and discards it.
1319  */
1320 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1321 {
1322         struct page *page;
1323         int rr, rv = 1;
1324         void *data;
1325
1326         if (!data_size)
1327                 return true;
1328
1329         page = drbd_pp_alloc(mdev, 1, 1);
1330
1331         data = kmap(page);
1332         while (data_size) {
1333                 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1334                 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1335                         rv = 0;
1336                         if (!signal_pending(current))
1337                                 dev_warn(DEV,
1338                                         "short read receiving data: read %d expected %d\n",
1339                                         rr, min_t(int, data_size, PAGE_SIZE));
1340                         break;
1341                 }
1342                 data_size -= rr;
1343         }
1344         kunmap(page);
1345         drbd_pp_free(mdev, page, 0);
1346         return rv;
1347 }
1348
1349 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1350                            sector_t sector, int data_size)
1351 {
1352         struct bio_vec *bvec;
1353         struct bio *bio;
1354         int dgs, rr, i, expect;
1355         void *dig_in = mdev->int_dig_in;
1356         void *dig_vv = mdev->int_dig_vv;
1357
1358         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1359                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1360
1361         if (dgs) {
1362                 rr = drbd_recv(mdev, dig_in, dgs);
1363                 if (rr != dgs) {
1364                         if (!signal_pending(current))
1365                                 dev_warn(DEV,
1366                                         "short read receiving data reply digest: read %d expected %d\n",
1367                                         rr, dgs);
1368                         return 0;
1369                 }
1370         }
1371
1372         data_size -= dgs;
1373
1374         /* optimistically update recv_cnt.  if receiving fails below,
1375          * we disconnect anyways, and counters will be reset. */
1376         mdev->recv_cnt += data_size>>9;
1377
1378         bio = req->master_bio;
1379         D_ASSERT(sector == bio->bi_sector);
1380
1381         bio_for_each_segment(bvec, bio, i) {
1382                 expect = min_t(int, data_size, bvec->bv_len);
1383                 rr = drbd_recv(mdev,
1384                              kmap(bvec->bv_page)+bvec->bv_offset,
1385                              expect);
1386                 kunmap(bvec->bv_page);
1387                 if (rr != expect) {
1388                         if (!signal_pending(current))
1389                                 dev_warn(DEV, "short read receiving data reply: "
1390                                         "read %d expected %d\n",
1391                                         rr, expect);
1392                         return 0;
1393                 }
1394                 data_size -= rr;
1395         }
1396
1397         if (dgs) {
1398                 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1399                 if (memcmp(dig_in, dig_vv, dgs)) {
1400                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1401                         return 0;
1402                 }
1403         }
1404
1405         D_ASSERT(data_size == 0);
1406         return 1;
1407 }
1408
1409 /* e_end_resync_block() is called via
1410  * drbd_process_done_ee() by asender only */
1411 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1412 {
1413         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1414         sector_t sector = e->sector;
1415         int ok;
1416
1417         D_ASSERT(hlist_unhashed(&e->collision));
1418
1419         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1420                 drbd_set_in_sync(mdev, sector, e->size);
1421                 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1422         } else {
1423                 /* Record failure to sync */
1424                 drbd_rs_failed_io(mdev, sector, e->size);
1425
1426                 ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1427         }
1428         dec_unacked(mdev);
1429
1430         return ok;
1431 }
1432
1433 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1434 {
1435         struct drbd_epoch_entry *e;
1436
1437         e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1438         if (!e)
1439                 goto fail;
1440
1441         dec_rs_pending(mdev);
1442
1443         inc_unacked(mdev);
1444         /* corresponding dec_unacked() in e_end_resync_block()
1445          * respective _drbd_clear_done_ee */
1446
1447         e->w.cb = e_end_resync_block;
1448
1449         spin_lock_irq(&mdev->req_lock);
1450         list_add(&e->w.list, &mdev->sync_ee);
1451         spin_unlock_irq(&mdev->req_lock);
1452
1453         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1454         if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1455                 return true;
1456
1457         /* don't care for the reason here */
1458         dev_err(DEV, "submit failed, triggering re-connect\n");
1459         spin_lock_irq(&mdev->req_lock);
1460         list_del(&e->w.list);
1461         spin_unlock_irq(&mdev->req_lock);
1462
1463         drbd_free_ee(mdev, e);
1464 fail:
1465         put_ldev(mdev);
1466         return false;
1467 }
1468
1469 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1470 {
1471         struct drbd_request *req;
1472         sector_t sector;
1473         int ok;
1474         struct p_data *p = &mdev->data.rbuf.data;
1475
1476         sector = be64_to_cpu(p->sector);
1477
1478         spin_lock_irq(&mdev->req_lock);
1479         req = _ar_id_to_req(mdev, p->block_id, sector);
1480         spin_unlock_irq(&mdev->req_lock);
1481         if (unlikely(!req)) {
1482                 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1483                 return false;
1484         }
1485
1486         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1487          * special casing it there for the various failure cases.
1488          * still no race with drbd_fail_pending_reads */
1489         ok = recv_dless_read(mdev, req, sector, data_size);
1490
1491         if (ok)
1492                 req_mod(req, data_received);
1493         /* else: nothing. handled from drbd_disconnect...
1494          * I don't think we may complete this just yet
1495          * in case we are "on-disconnect: freeze" */
1496
1497         return ok;
1498 }
1499
1500 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1501 {
1502         sector_t sector;
1503         int ok;
1504         struct p_data *p = &mdev->data.rbuf.data;
1505
1506         sector = be64_to_cpu(p->sector);
1507         D_ASSERT(p->block_id == ID_SYNCER);
1508
1509         if (get_ldev(mdev)) {
1510                 /* data is submitted to disk within recv_resync_read.
1511                  * corresponding put_ldev done below on error,
1512                  * or in drbd_endio_write_sec. */
1513                 ok = recv_resync_read(mdev, sector, data_size);
1514         } else {
1515                 if (__ratelimit(&drbd_ratelimit_state))
1516                         dev_err(DEV, "Can not write resync data to local disk.\n");
1517
1518                 ok = drbd_drain_block(mdev, data_size);
1519
1520                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1521         }
1522
1523         atomic_add(data_size >> 9, &mdev->rs_sect_in);
1524
1525         return ok;
1526 }
1527
1528 /* e_end_block() is called via drbd_process_done_ee().
1529  * this means this function only runs in the asender thread
1530  */
1531 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1532 {
1533         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1534         sector_t sector = e->sector;
1535         int ok = 1, pcmd;
1536
1537         if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1538                 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1539                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1540                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1541                                 e->flags & EE_MAY_SET_IN_SYNC) ?
1542                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1543                         ok &= drbd_send_ack(mdev, pcmd, e);
1544                         if (pcmd == P_RS_WRITE_ACK)
1545                                 drbd_set_in_sync(mdev, sector, e->size);
1546                 } else {
1547                         ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1548                         /* we expect it to be marked out of sync anyways...
1549                          * maybe assert this?  */
1550                 }
1551                 dec_unacked(mdev);
1552         }
1553         /* we delete from the conflict detection hash _after_ we sent out the
1554          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1555         if (mdev->net_conf->two_primaries) {
1556                 spin_lock_irq(&mdev->req_lock);
1557                 D_ASSERT(!hlist_unhashed(&e->collision));
1558                 hlist_del_init(&e->collision);
1559                 spin_unlock_irq(&mdev->req_lock);
1560         } else {
1561                 D_ASSERT(hlist_unhashed(&e->collision));
1562         }
1563
1564         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1565
1566         return ok;
1567 }
1568
1569 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1570 {
1571         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1572         int ok = 1;
1573
1574         D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1575         ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1576
1577         spin_lock_irq(&mdev->req_lock);
1578         D_ASSERT(!hlist_unhashed(&e->collision));
1579         hlist_del_init(&e->collision);
1580         spin_unlock_irq(&mdev->req_lock);
1581
1582         dec_unacked(mdev);
1583
1584         return ok;
1585 }
1586
1587 /* Called from receive_Data.
1588  * Synchronize packets on sock with packets on msock.
1589  *
1590  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1591  * packet traveling on msock, they are still processed in the order they have
1592  * been sent.
1593  *
1594  * Note: we don't care for Ack packets overtaking P_DATA packets.
1595  *
1596  * In case packet_seq is larger than mdev->peer_seq number, there are
1597  * outstanding packets on the msock. We wait for them to arrive.
1598  * In case we are the logically next packet, we update mdev->peer_seq
1599  * ourselves. Correctly handles 32bit wrap around.
1600  *
1601  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1602  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1603  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1604  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1605  *
1606  * returns 0 if we may process the packet,
1607  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1608 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1609 {
1610         DEFINE_WAIT(wait);
1611         unsigned int p_seq;
1612         long timeout;
1613         int ret = 0;
1614         spin_lock(&mdev->peer_seq_lock);
1615         for (;;) {
1616                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1617                 if (seq_le(packet_seq, mdev->peer_seq+1))
1618                         break;
1619                 if (signal_pending(current)) {
1620                         ret = -ERESTARTSYS;
1621                         break;
1622                 }
1623                 p_seq = mdev->peer_seq;
1624                 spin_unlock(&mdev->peer_seq_lock);
1625                 timeout = schedule_timeout(30*HZ);
1626                 spin_lock(&mdev->peer_seq_lock);
1627                 if (timeout == 0 && p_seq == mdev->peer_seq) {
1628                         ret = -ETIMEDOUT;
1629                         dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1630                         break;
1631                 }
1632         }
1633         finish_wait(&mdev->seq_wait, &wait);
1634         if (mdev->peer_seq+1 == packet_seq)
1635                 mdev->peer_seq++;
1636         spin_unlock(&mdev->peer_seq_lock);
1637         return ret;
1638 }
1639
1640 /* see also bio_flags_to_wire()
1641  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1642  * flags and back. We may replicate to other kernel versions. */
1643 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1644 {
1645         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1646                 (dpf & DP_FUA ? REQ_FUA : 0) |
1647                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1648                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1649 }
1650
1651 /* mirrored write */
1652 static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1653 {
1654         sector_t sector;
1655         struct drbd_epoch_entry *e;
1656         struct p_data *p = &mdev->data.rbuf.data;
1657         int rw = WRITE;
1658         u32 dp_flags;
1659
1660         if (!get_ldev(mdev)) {
1661                 spin_lock(&mdev->peer_seq_lock);
1662                 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1663                         mdev->peer_seq++;
1664                 spin_unlock(&mdev->peer_seq_lock);
1665
1666                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1667                 atomic_inc(&mdev->current_epoch->epoch_size);
1668                 return drbd_drain_block(mdev, data_size);
1669         }
1670
1671         /* get_ldev(mdev) successful.
1672          * Corresponding put_ldev done either below (on various errors),
1673          * or in drbd_endio_write_sec, if we successfully submit the data at
1674          * the end of this function. */
1675
1676         sector = be64_to_cpu(p->sector);
1677         e = read_in_block(mdev, p->block_id, sector, data_size);
1678         if (!e) {
1679                 put_ldev(mdev);
1680                 return false;
1681         }
1682
1683         e->w.cb = e_end_block;
1684
1685         dp_flags = be32_to_cpu(p->dp_flags);
1686         rw |= wire_flags_to_bio(mdev, dp_flags);
1687
1688         if (dp_flags & DP_MAY_SET_IN_SYNC)
1689                 e->flags |= EE_MAY_SET_IN_SYNC;
1690
1691         spin_lock(&mdev->epoch_lock);
1692         e->epoch = mdev->current_epoch;
1693         atomic_inc(&e->epoch->epoch_size);
1694         atomic_inc(&e->epoch->active);
1695         spin_unlock(&mdev->epoch_lock);
1696
1697         /* I'm the receiver, I do hold a net_cnt reference. */
1698         if (!mdev->net_conf->two_primaries) {
1699                 spin_lock_irq(&mdev->req_lock);
1700         } else {
1701                 /* don't get the req_lock yet,
1702                  * we may sleep in drbd_wait_peer_seq */
1703                 const int size = e->size;
1704                 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1705                 DEFINE_WAIT(wait);
1706                 struct drbd_request *i;
1707                 struct hlist_node *n;
1708                 struct hlist_head *slot;
1709                 int first;
1710
1711                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1712                 BUG_ON(mdev->ee_hash == NULL);
1713                 BUG_ON(mdev->tl_hash == NULL);
1714
1715                 /* conflict detection and handling:
1716                  * 1. wait on the sequence number,
1717                  *    in case this data packet overtook ACK packets.
1718                  * 2. check our hash tables for conflicting requests.
1719                  *    we only need to walk the tl_hash, since an ee can not
1720                  *    have a conflict with an other ee: on the submitting
1721                  *    node, the corresponding req had already been conflicting,
1722                  *    and a conflicting req is never sent.
1723                  *
1724                  * Note: for two_primaries, we are protocol C,
1725                  * so there cannot be any request that is DONE
1726                  * but still on the transfer log.
1727                  *
1728                  * unconditionally add to the ee_hash.
1729                  *
1730                  * if no conflicting request is found:
1731                  *    submit.
1732                  *
1733                  * if any conflicting request is found
1734                  * that has not yet been acked,
1735                  * AND I have the "discard concurrent writes" flag:
1736                  *       queue (via done_ee) the P_DISCARD_ACK; OUT.
1737                  *
1738                  * if any conflicting request is found:
1739                  *       block the receiver, waiting on misc_wait
1740                  *       until no more conflicting requests are there,
1741                  *       or we get interrupted (disconnect).
1742                  *
1743                  *       we do not just write after local io completion of those
1744                  *       requests, but only after req is done completely, i.e.
1745                  *       we wait for the P_DISCARD_ACK to arrive!
1746                  *
1747                  *       then proceed normally, i.e. submit.
1748                  */
1749                 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1750                         goto out_interrupted;
1751
1752                 spin_lock_irq(&mdev->req_lock);
1753
1754                 hlist_add_head(&e->collision, ee_hash_slot(mdev, sector));
1755
1756 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1757                 slot = tl_hash_slot(mdev, sector);
1758                 first = 1;
1759                 for (;;) {
1760                         int have_unacked = 0;
1761                         int have_conflict = 0;
1762                         prepare_to_wait(&mdev->misc_wait, &wait,
1763                                 TASK_INTERRUPTIBLE);
1764                         hlist_for_each_entry(i, n, slot, collision) {
1765                                 if (OVERLAPS) {
1766                                         /* only ALERT on first iteration,
1767                                          * we may be woken up early... */
1768                                         if (first)
1769                                                 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1770                                                       " new: %llus +%u; pending: %llus +%u\n",
1771                                                       current->comm, current->pid,
1772                                                       (unsigned long long)sector, size,
1773                                                       (unsigned long long)i->sector, i->size);
1774                                         if (i->rq_state & RQ_NET_PENDING)
1775                                                 ++have_unacked;
1776                                         ++have_conflict;
1777                                 }
1778                         }
1779 #undef OVERLAPS
1780                         if (!have_conflict)
1781                                 break;
1782
1783                         /* Discard Ack only for the _first_ iteration */
1784                         if (first && discard && have_unacked) {
1785                                 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1786                                      (unsigned long long)sector);
1787                                 inc_unacked(mdev);
1788                                 e->w.cb = e_send_discard_ack;
1789                                 list_add_tail(&e->w.list, &mdev->done_ee);
1790
1791                                 spin_unlock_irq(&mdev->req_lock);
1792
1793                                 /* we could probably send that P_DISCARD_ACK ourselves,
1794                                  * but I don't like the receiver using the msock */
1795
1796                                 put_ldev(mdev);
1797                                 wake_asender(mdev);
1798                                 finish_wait(&mdev->misc_wait, &wait);
1799                                 return true;
1800                         }
1801
1802                         if (signal_pending(current)) {
1803                                 hlist_del_init(&e->collision);
1804
1805                                 spin_unlock_irq(&mdev->req_lock);
1806
1807                                 finish_wait(&mdev->misc_wait, &wait);
1808                                 goto out_interrupted;
1809                         }
1810
1811                         spin_unlock_irq(&mdev->req_lock);
1812                         if (first) {
1813                                 first = 0;
1814                                 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1815                                      "sec=%llus\n", (unsigned long long)sector);
1816                         } else if (discard) {
1817                                 /* we had none on the first iteration.
1818                                  * there must be none now. */
1819                                 D_ASSERT(have_unacked == 0);
1820                         }
1821                         schedule();
1822                         spin_lock_irq(&mdev->req_lock);
1823                 }
1824                 finish_wait(&mdev->misc_wait, &wait);
1825         }
1826
1827         list_add(&e->w.list, &mdev->active_ee);
1828         spin_unlock_irq(&mdev->req_lock);
1829
1830         switch (mdev->net_conf->wire_protocol) {
1831         case DRBD_PROT_C:
1832                 inc_unacked(mdev);
1833                 /* corresponding dec_unacked() in e_end_block()
1834                  * respective _drbd_clear_done_ee */
1835                 break;
1836         case DRBD_PROT_B:
1837                 /* I really don't like it that the receiver thread
1838                  * sends on the msock, but anyways */
1839                 drbd_send_ack(mdev, P_RECV_ACK, e);
1840                 break;
1841         case DRBD_PROT_A:
1842                 /* nothing to do */
1843                 break;
1844         }
1845
1846         if (mdev->state.pdsk < D_INCONSISTENT) {
1847                 /* In case we have the only disk of the cluster, */
1848                 drbd_set_out_of_sync(mdev, e->sector, e->size);
1849                 e->flags |= EE_CALL_AL_COMPLETE_IO;
1850                 e->flags &= ~EE_MAY_SET_IN_SYNC;
1851                 drbd_al_begin_io(mdev, e->sector);
1852         }
1853
1854         if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1855                 return true;
1856
1857         /* don't care for the reason here */
1858         dev_err(DEV, "submit failed, triggering re-connect\n");
1859         spin_lock_irq(&mdev->req_lock);
1860         list_del(&e->w.list);
1861         hlist_del_init(&e->collision);
1862         spin_unlock_irq(&mdev->req_lock);
1863         if (e->flags & EE_CALL_AL_COMPLETE_IO)
1864                 drbd_al_complete_io(mdev, e->sector);
1865
1866 out_interrupted:
1867         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
1868         put_ldev(mdev);
1869         drbd_free_ee(mdev, e);
1870         return false;
1871 }
1872
1873 /* We may throttle resync, if the lower device seems to be busy,
1874  * and current sync rate is above c_min_rate.
1875  *
1876  * To decide whether or not the lower device is busy, we use a scheme similar
1877  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1878  * (more than 64 sectors) of activity we cannot account for with our own resync
1879  * activity, it obviously is "busy".
1880  *
1881  * The current sync rate used here uses only the most recent two step marks,
1882  * to have a short time average so we can react faster.
1883  */
1884 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1885 {
1886         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1887         unsigned long db, dt, dbdt;
1888         struct lc_element *tmp;
1889         int curr_events;
1890         int throttle = 0;
1891
1892         /* feature disabled? */
1893         if (mdev->sync_conf.c_min_rate == 0)
1894                 return 0;
1895
1896         spin_lock_irq(&mdev->al_lock);
1897         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1898         if (tmp) {
1899                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1900                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1901                         spin_unlock_irq(&mdev->al_lock);
1902                         return 0;
1903                 }
1904                 /* Do not slow down if app IO is already waiting for this extent */
1905         }
1906         spin_unlock_irq(&mdev->al_lock);
1907
1908         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1909                       (int)part_stat_read(&disk->part0, sectors[1]) -
1910                         atomic_read(&mdev->rs_sect_ev);
1911
1912         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1913                 unsigned long rs_left;
1914                 int i;
1915
1916                 mdev->rs_last_events = curr_events;
1917
1918                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1919                  * approx. */
1920                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1921
1922                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1923                         rs_left = mdev->ov_left;
1924                 else
1925                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1926
1927                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1928                 if (!dt)
1929                         dt++;
1930                 db = mdev->rs_mark_left[i] - rs_left;
1931                 dbdt = Bit2KB(db/dt);
1932
1933                 if (dbdt > mdev->sync_conf.c_min_rate)
1934                         throttle = 1;
1935         }
1936         return throttle;
1937 }
1938
1939
1940 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1941 {
1942         sector_t sector;
1943         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1944         struct drbd_epoch_entry *e;
1945         struct digest_info *di = NULL;
1946         int size, verb;
1947         unsigned int fault_type;
1948         struct p_block_req *p = &mdev->data.rbuf.block_req;
1949
1950         sector = be64_to_cpu(p->sector);
1951         size   = be32_to_cpu(p->blksize);
1952
1953         if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
1954                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1955                                 (unsigned long long)sector, size);
1956                 return false;
1957         }
1958         if (sector + (size>>9) > capacity) {
1959                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1960                                 (unsigned long long)sector, size);
1961                 return false;
1962         }
1963
1964         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1965                 verb = 1;
1966                 switch (cmd) {
1967                 case P_DATA_REQUEST:
1968                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
1969                         break;
1970                 case P_RS_DATA_REQUEST:
1971                 case P_CSUM_RS_REQUEST:
1972                 case P_OV_REQUEST:
1973                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
1974                         break;
1975                 case P_OV_REPLY:
1976                         verb = 0;
1977                         dec_rs_pending(mdev);
1978                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
1979                         break;
1980                 default:
1981                         dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
1982                                 cmdname(cmd));
1983                 }
1984                 if (verb && __ratelimit(&drbd_ratelimit_state))
1985                         dev_err(DEV, "Can not satisfy peer's read request, "
1986                             "no local data.\n");
1987
1988                 /* drain possibly payload */
1989                 return drbd_drain_block(mdev, digest_size);
1990         }
1991
1992         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1993          * "criss-cross" setup, that might cause write-out on some other DRBD,
1994          * which in turn might block on the other node at this very place.  */
1995         e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1996         if (!e) {
1997                 put_ldev(mdev);
1998                 return false;
1999         }
2000
2001         switch (cmd) {
2002         case P_DATA_REQUEST:
2003                 e->w.cb = w_e_end_data_req;
2004                 fault_type = DRBD_FAULT_DT_RD;
2005                 /* application IO, don't drbd_rs_begin_io */
2006                 goto submit;
2007
2008         case P_RS_DATA_REQUEST:
2009                 e->w.cb = w_e_end_rsdata_req;
2010                 fault_type = DRBD_FAULT_RS_RD;
2011                 /* used in the sector offset progress display */
2012                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2013                 break;
2014
2015         case P_OV_REPLY:
2016         case P_CSUM_RS_REQUEST:
2017                 fault_type = DRBD_FAULT_RS_RD;
2018                 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2019                 if (!di)
2020                         goto out_free_e;
2021
2022                 di->digest_size = digest_size;
2023                 di->digest = (((char *)di)+sizeof(struct digest_info));
2024
2025                 e->digest = di;
2026                 e->flags |= EE_HAS_DIGEST;
2027
2028                 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2029                         goto out_free_e;
2030
2031                 if (cmd == P_CSUM_RS_REQUEST) {
2032                         D_ASSERT(mdev->agreed_pro_version >= 89);
2033                         e->w.cb = w_e_end_csum_rs_req;
2034                         /* used in the sector offset progress display */
2035                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2036                 } else if (cmd == P_OV_REPLY) {
2037                         /* track progress, we may need to throttle */
2038                         atomic_add(size >> 9, &mdev->rs_sect_in);
2039                         e->w.cb = w_e_end_ov_reply;
2040                         dec_rs_pending(mdev);
2041                         /* drbd_rs_begin_io done when we sent this request,
2042                          * but accounting still needs to be done. */
2043                         goto submit_for_resync;
2044                 }
2045                 break;
2046
2047         case P_OV_REQUEST:
2048                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2049                     mdev->agreed_pro_version >= 90) {
2050                         unsigned long now = jiffies;
2051                         int i;
2052                         mdev->ov_start_sector = sector;
2053                         mdev->ov_position = sector;
2054                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2055                         mdev->rs_total = mdev->ov_left;
2056                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2057                                 mdev->rs_mark_left[i] = mdev->ov_left;
2058                                 mdev->rs_mark_time[i] = now;
2059                         }
2060                         dev_info(DEV, "Online Verify start sector: %llu\n",
2061                                         (unsigned long long)sector);
2062                 }
2063                 e->w.cb = w_e_end_ov_req;
2064                 fault_type = DRBD_FAULT_RS_RD;
2065                 break;
2066
2067         default:
2068                 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2069                     cmdname(cmd));
2070                 fault_type = DRBD_FAULT_MAX;
2071                 goto out_free_e;
2072         }
2073
2074         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2075          * wrt the receiver, but it is not as straightforward as it may seem.
2076          * Various places in the resync start and stop logic assume resync
2077          * requests are processed in order, requeuing this on the worker thread
2078          * introduces a bunch of new code for synchronization between threads.
2079          *
2080          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2081          * "forever", throttling after drbd_rs_begin_io will lock that extent
2082          * for application writes for the same time.  For now, just throttle
2083          * here, where the rest of the code expects the receiver to sleep for
2084          * a while, anyways.
2085          */
2086
2087         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2088          * this defers syncer requests for some time, before letting at least
2089          * on request through.  The resync controller on the receiving side
2090          * will adapt to the incoming rate accordingly.
2091          *
2092          * We cannot throttle here if remote is Primary/SyncTarget:
2093          * we would also throttle its application reads.
2094          * In that case, throttling is done on the SyncTarget only.
2095          */
2096         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2097                 schedule_timeout_uninterruptible(HZ/10);
2098         if (drbd_rs_begin_io(mdev, sector))
2099                 goto out_free_e;
2100
2101 submit_for_resync:
2102         atomic_add(size >> 9, &mdev->rs_sect_ev);
2103
2104 submit:
2105         inc_unacked(mdev);
2106         spin_lock_irq(&mdev->req_lock);
2107         list_add_tail(&e->w.list, &mdev->read_ee);
2108         spin_unlock_irq(&mdev->req_lock);
2109
2110         if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2111                 return true;
2112
2113         /* don't care for the reason here */
2114         dev_err(DEV, "submit failed, triggering re-connect\n");
2115         spin_lock_irq(&mdev->req_lock);
2116         list_del(&e->w.list);
2117         spin_unlock_irq(&mdev->req_lock);
2118         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2119
2120 out_free_e:
2121         put_ldev(mdev);
2122         drbd_free_ee(mdev, e);
2123         return false;
2124 }
2125
2126 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2127 {
2128         int self, peer, rv = -100;
2129         unsigned long ch_self, ch_peer;
2130
2131         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2132         peer = mdev->p_uuid[UI_BITMAP] & 1;
2133
2134         ch_peer = mdev->p_uuid[UI_SIZE];
2135         ch_self = mdev->comm_bm_set;
2136
2137         switch (mdev->net_conf->after_sb_0p) {
2138         case ASB_CONSENSUS:
2139         case ASB_DISCARD_SECONDARY:
2140         case ASB_CALL_HELPER:
2141                 dev_err(DEV, "Configuration error.\n");
2142                 break;
2143         case ASB_DISCONNECT:
2144                 break;
2145         case ASB_DISCARD_YOUNGER_PRI:
2146                 if (self == 0 && peer == 1) {
2147                         rv = -1;
2148                         break;
2149                 }
2150                 if (self == 1 && peer == 0) {
2151                         rv =  1;
2152                         break;
2153                 }
2154                 /* Else fall through to one of the other strategies... */
2155         case ASB_DISCARD_OLDER_PRI:
2156                 if (self == 0 && peer == 1) {
2157                         rv = 1;
2158                         break;
2159                 }
2160                 if (self == 1 && peer == 0) {
2161                         rv = -1;
2162                         break;
2163                 }
2164                 /* Else fall through to one of the other strategies... */
2165                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2166                      "Using discard-least-changes instead\n");
2167         case ASB_DISCARD_ZERO_CHG:
2168                 if (ch_peer == 0 && ch_self == 0) {
2169                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2170                                 ? -1 : 1;
2171                         break;
2172                 } else {
2173                         if (ch_peer == 0) { rv =  1; break; }
2174                         if (ch_self == 0) { rv = -1; break; }
2175                 }
2176                 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2177                         break;
2178         case ASB_DISCARD_LEAST_CHG:
2179                 if      (ch_self < ch_peer)
2180                         rv = -1;
2181                 else if (ch_self > ch_peer)
2182                         rv =  1;
2183                 else /* ( ch_self == ch_peer ) */
2184                      /* Well, then use something else. */
2185                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2186                                 ? -1 : 1;
2187                 break;
2188         case ASB_DISCARD_LOCAL:
2189                 rv = -1;
2190                 break;
2191         case ASB_DISCARD_REMOTE:
2192                 rv =  1;
2193         }
2194
2195         return rv;
2196 }
2197
2198 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2199 {
2200         int hg, rv = -100;
2201
2202         switch (mdev->net_conf->after_sb_1p) {
2203         case ASB_DISCARD_YOUNGER_PRI:
2204         case ASB_DISCARD_OLDER_PRI:
2205         case ASB_DISCARD_LEAST_CHG:
2206         case ASB_DISCARD_LOCAL:
2207         case ASB_DISCARD_REMOTE:
2208                 dev_err(DEV, "Configuration error.\n");
2209                 break;
2210         case ASB_DISCONNECT:
2211                 break;
2212         case ASB_CONSENSUS:
2213                 hg = drbd_asb_recover_0p(mdev);
2214                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2215                         rv = hg;
2216                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2217                         rv = hg;
2218                 break;
2219         case ASB_VIOLENTLY:
2220                 rv = drbd_asb_recover_0p(mdev);
2221                 break;
2222         case ASB_DISCARD_SECONDARY:
2223                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2224         case ASB_CALL_HELPER:
2225                 hg = drbd_asb_recover_0p(mdev);
2226                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2227                         enum drbd_state_rv rv2;
2228
2229                         drbd_set_role(mdev, R_SECONDARY, 0);
2230                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2231                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2232                           * we do not need to wait for the after state change work either. */
2233                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2234                         if (rv2 != SS_SUCCESS) {
2235                                 drbd_khelper(mdev, "pri-lost-after-sb");
2236                         } else {
2237                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2238                                 rv = hg;
2239                         }
2240                 } else
2241                         rv = hg;
2242         }
2243
2244         return rv;
2245 }
2246
2247 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2248 {
2249         int hg, rv = -100;
2250
2251         switch (mdev->net_conf->after_sb_2p) {
2252         case ASB_DISCARD_YOUNGER_PRI:
2253         case ASB_DISCARD_OLDER_PRI:
2254         case ASB_DISCARD_LEAST_CHG:
2255         case ASB_DISCARD_LOCAL:
2256         case ASB_DISCARD_REMOTE:
2257         case ASB_CONSENSUS:
2258         case ASB_DISCARD_SECONDARY:
2259                 dev_err(DEV, "Configuration error.\n");
2260                 break;
2261         case ASB_VIOLENTLY:
2262                 rv = drbd_asb_recover_0p(mdev);
2263                 break;
2264         case ASB_DISCONNECT:
2265                 break;
2266         case ASB_CALL_HELPER:
2267                 hg = drbd_asb_recover_0p(mdev);
2268                 if (hg == -1) {
2269                         enum drbd_state_rv rv2;
2270
2271                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2272                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2273                           * we do not need to wait for the after state change work either. */
2274                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2275                         if (rv2 != SS_SUCCESS) {
2276                                 drbd_khelper(mdev, "pri-lost-after-sb");
2277                         } else {
2278                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2279                                 rv = hg;
2280                         }
2281                 } else
2282                         rv = hg;
2283         }
2284
2285         return rv;
2286 }
2287
2288 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2289                            u64 bits, u64 flags)
2290 {
2291         if (!uuid) {
2292                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2293                 return;
2294         }
2295         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2296              text,
2297              (unsigned long long)uuid[UI_CURRENT],
2298              (unsigned long long)uuid[UI_BITMAP],
2299              (unsigned long long)uuid[UI_HISTORY_START],
2300              (unsigned long long)uuid[UI_HISTORY_END],
2301              (unsigned long long)bits,
2302              (unsigned long long)flags);
2303 }
2304
2305 /*
2306   100   after split brain try auto recover
2307     2   C_SYNC_SOURCE set BitMap
2308     1   C_SYNC_SOURCE use BitMap
2309     0   no Sync
2310    -1   C_SYNC_TARGET use BitMap
2311    -2   C_SYNC_TARGET set BitMap
2312  -100   after split brain, disconnect
2313 -1000   unrelated data
2314 -1091   requires proto 91
2315 -1096   requires proto 96
2316  */
2317 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2318 {
2319         u64 self, peer;
2320         int i, j;
2321
2322         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2323         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2324
2325         *rule_nr = 10;
2326         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2327                 return 0;
2328
2329         *rule_nr = 20;
2330         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2331              peer != UUID_JUST_CREATED)
2332                 return -2;
2333
2334         *rule_nr = 30;
2335         if (self != UUID_JUST_CREATED &&
2336             (peer == UUID_JUST_CREATED || peer == (u64)0))
2337                 return 2;
2338
2339         if (self == peer) {
2340                 int rct, dc; /* roles at crash time */
2341
2342                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2343
2344                         if (mdev->agreed_pro_version < 91)
2345                                 return -1091;
2346
2347                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2348                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2349                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2350                                 drbd_uuid_set_bm(mdev, 0UL);
2351
2352                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2353                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2354                                 *rule_nr = 34;
2355                         } else {
2356                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2357                                 *rule_nr = 36;
2358                         }
2359
2360                         return 1;
2361                 }
2362
2363                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2364
2365                         if (mdev->agreed_pro_version < 91)
2366                                 return -1091;
2367
2368                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2369                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2370                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2371
2372                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2373                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2374                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2375
2376                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2377                                 *rule_nr = 35;
2378                         } else {
2379                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2380                                 *rule_nr = 37;
2381                         }
2382
2383                         return -1;
2384                 }
2385
2386                 /* Common power [off|failure] */
2387                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2388                         (mdev->p_uuid[UI_FLAGS] & 2);
2389                 /* lowest bit is set when we were primary,
2390                  * next bit (weight 2) is set when peer was primary */
2391                 *rule_nr = 40;
2392
2393                 switch (rct) {
2394                 case 0: /* !self_pri && !peer_pri */ return 0;
2395                 case 1: /*  self_pri && !peer_pri */ return 1;
2396                 case 2: /* !self_pri &&  peer_pri */ return -1;
2397                 case 3: /*  self_pri &&  peer_pri */
2398                         dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2399                         return dc ? -1 : 1;
2400                 }
2401         }
2402
2403         *rule_nr = 50;
2404         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2405         if (self == peer)
2406                 return -1;
2407
2408         *rule_nr = 51;
2409         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2410         if (self == peer) {
2411                 if (mdev->agreed_pro_version < 96 ?
2412                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2413                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2414                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2415                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2416                            resync as sync source modifications of the peer's UUIDs. */
2417
2418                         if (mdev->agreed_pro_version < 91)
2419                                 return -1091;
2420
2421                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2422                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2423
2424                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2425                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2426
2427                         return -1;
2428                 }
2429         }
2430
2431         *rule_nr = 60;
2432         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2433         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2434                 peer = mdev->p_uuid[i] & ~((u64)1);
2435                 if (self == peer)
2436                         return -2;
2437         }
2438
2439         *rule_nr = 70;
2440         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2441         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2442         if (self == peer)
2443                 return 1;
2444
2445         *rule_nr = 71;
2446         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2447         if (self == peer) {
2448                 if (mdev->agreed_pro_version < 96 ?
2449                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2450                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2451                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2452                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2453                            resync as sync source modifications of our UUIDs. */
2454
2455                         if (mdev->agreed_pro_version < 91)
2456                                 return -1091;
2457
2458                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2459                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2460
2461                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2462                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2463                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2464
2465                         return 1;
2466                 }
2467         }
2468
2469
2470         *rule_nr = 80;
2471         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2472         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2473                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2474                 if (self == peer)
2475                         return 2;
2476         }
2477
2478         *rule_nr = 90;
2479         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2480         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2481         if (self == peer && self != ((u64)0))
2482                 return 100;
2483
2484         *rule_nr = 100;
2485         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2486                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2487                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2488                         peer = mdev->p_uuid[j] & ~((u64)1);
2489                         if (self == peer)
2490                                 return -100;
2491                 }
2492         }
2493
2494         return -1000;
2495 }
2496
2497 /* drbd_sync_handshake() returns the new conn state on success, or
2498    CONN_MASK (-1) on failure.
2499  */
2500 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2501                                            enum drbd_disk_state peer_disk) __must_hold(local)
2502 {
2503         int hg, rule_nr;
2504         enum drbd_conns rv = C_MASK;
2505         enum drbd_disk_state mydisk;
2506
2507         mydisk = mdev->state.disk;
2508         if (mydisk == D_NEGOTIATING)
2509                 mydisk = mdev->new_state_tmp.disk;
2510
2511         dev_info(DEV, "drbd_sync_handshake:\n");
2512         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2513         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2514                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2515
2516         hg = drbd_uuid_compare(mdev, &rule_nr);
2517
2518         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2519
2520         if (hg == -1000) {
2521                 dev_alert(DEV, "Unrelated data, aborting!\n");
2522                 return C_MASK;
2523         }
2524         if (hg < -1000) {
2525                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2526                 return C_MASK;
2527         }
2528
2529         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2530             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2531                 int f = (hg == -100) || abs(hg) == 2;
2532                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2533                 if (f)
2534                         hg = hg*2;
2535                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2536                      hg > 0 ? "source" : "target");
2537         }
2538
2539         if (abs(hg) == 100)
2540                 drbd_khelper(mdev, "initial-split-brain");
2541
2542         if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2543                 int pcount = (mdev->state.role == R_PRIMARY)
2544                            + (peer_role == R_PRIMARY);
2545                 int forced = (hg == -100);
2546
2547                 switch (pcount) {
2548                 case 0:
2549                         hg = drbd_asb_recover_0p(mdev);
2550                         break;
2551                 case 1:
2552                         hg = drbd_asb_recover_1p(mdev);
2553                         break;
2554                 case 2:
2555                         hg = drbd_asb_recover_2p(mdev);
2556                         break;
2557                 }
2558                 if (abs(hg) < 100) {
2559                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2560                              "automatically solved. Sync from %s node\n",
2561                              pcount, (hg < 0) ? "peer" : "this");
2562                         if (forced) {
2563                                 dev_warn(DEV, "Doing a full sync, since"
2564                                      " UUIDs where ambiguous.\n");
2565                                 hg = hg*2;
2566                         }
2567                 }
2568         }
2569
2570         if (hg == -100) {
2571                 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2572                         hg = -1;
2573                 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2574                         hg = 1;
2575
2576                 if (abs(hg) < 100)
2577                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2578                              "Sync from %s node\n",
2579                              (hg < 0) ? "peer" : "this");
2580         }
2581
2582         if (hg == -100) {
2583                 /* FIXME this log message is not correct if we end up here
2584                  * after an attempted attach on a diskless node.
2585                  * We just refuse to attach -- well, we drop the "connection"
2586                  * to that disk, in a way... */
2587                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2588                 drbd_khelper(mdev, "split-brain");
2589                 return C_MASK;
2590         }
2591
2592         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2593                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2594                 return C_MASK;
2595         }
2596
2597         if (hg < 0 && /* by intention we do not use mydisk here. */
2598             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2599                 switch (mdev->net_conf->rr_conflict) {
2600                 case ASB_CALL_HELPER:
2601                         drbd_khelper(mdev, "pri-lost");
2602                         /* fall through */
2603                 case ASB_DISCONNECT:
2604                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2605                         return C_MASK;
2606                 case ASB_VIOLENTLY:
2607                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2608                              "assumption\n");
2609                 }
2610         }
2611
2612         if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2613                 if (hg == 0)
2614                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2615                 else
2616                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2617                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2618                                  abs(hg) >= 2 ? "full" : "bit-map based");
2619                 return C_MASK;
2620         }
2621
2622         if (abs(hg) >= 2) {
2623                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2624                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2625                                         BM_LOCKED_SET_ALLOWED))
2626                         return C_MASK;
2627         }
2628
2629         if (hg > 0) { /* become sync source. */
2630                 rv = C_WF_BITMAP_S;
2631         } else if (hg < 0) { /* become sync target */
2632                 rv = C_WF_BITMAP_T;
2633         } else {
2634                 rv = C_CONNECTED;
2635                 if (drbd_bm_total_weight(mdev)) {
2636                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2637                              drbd_bm_total_weight(mdev));
2638                 }
2639         }
2640
2641         return rv;
2642 }
2643
2644 /* returns 1 if invalid */
2645 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2646 {
2647         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2648         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2649             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2650                 return 0;
2651
2652         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2653         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2654             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2655                 return 1;
2656
2657         /* everything else is valid if they are equal on both sides. */
2658         if (peer == self)
2659                 return 0;
2660
2661         /* everything es is invalid. */
2662         return 1;
2663 }
2664
2665 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2666 {
2667         struct p_protocol *p = &mdev->data.rbuf.protocol;
2668         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2669         int p_want_lose, p_two_primaries, cf;
2670         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2671
2672         p_proto         = be32_to_cpu(p->protocol);
2673         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2674         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2675         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2676         p_two_primaries = be32_to_cpu(p->two_primaries);
2677         cf              = be32_to_cpu(p->conn_flags);
2678         p_want_lose = cf & CF_WANT_LOSE;
2679
2680         clear_bit(CONN_DRY_RUN, &mdev->flags);
2681
2682         if (cf & CF_DRY_RUN)
2683                 set_bit(CONN_DRY_RUN, &mdev->flags);
2684
2685         if (p_proto != mdev->net_conf->wire_protocol) {
2686                 dev_err(DEV, "incompatible communication protocols\n");
2687                 goto disconnect;
2688         }
2689
2690         if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2691                 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2692                 goto disconnect;
2693         }
2694
2695         if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2696                 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2697                 goto disconnect;
2698         }
2699
2700         if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2701                 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2702                 goto disconnect;
2703         }
2704
2705         if (p_want_lose && mdev->net_conf->want_lose) {
2706                 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2707                 goto disconnect;
2708         }
2709
2710         if (p_two_primaries != mdev->net_conf->two_primaries) {
2711                 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2712                 goto disconnect;
2713         }
2714
2715         if (mdev->agreed_pro_version >= 87) {
2716                 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2717
2718                 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2719                         return false;
2720
2721                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2722                 if (strcmp(p_integrity_alg, my_alg)) {
2723                         dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2724                         goto disconnect;
2725                 }
2726                 dev_info(DEV, "data-integrity-alg: %s\n",
2727                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2728         }
2729
2730         return true;
2731
2732 disconnect:
2733         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2734         return false;
2735 }
2736
2737 /* helper function
2738  * input: alg name, feature name
2739  * return: NULL (alg name was "")
2740  *         ERR_PTR(error) if something goes wrong
2741  *         or the crypto hash ptr, if it worked out ok. */
2742 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2743                 const char *alg, const char *name)
2744 {
2745         struct crypto_hash *tfm;
2746
2747         if (!alg[0])
2748                 return NULL;
2749
2750         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2751         if (IS_ERR(tfm)) {
2752                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2753                         alg, name, PTR_ERR(tfm));
2754                 return tfm;
2755         }
2756         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2757                 crypto_free_hash(tfm);
2758                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2759                 return ERR_PTR(-EINVAL);
2760         }
2761         return tfm;
2762 }
2763
2764 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2765 {
2766         int ok = true;
2767         struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2768         unsigned int header_size, data_size, exp_max_sz;
2769         struct crypto_hash *verify_tfm = NULL;
2770         struct crypto_hash *csums_tfm = NULL;
2771         const int apv = mdev->agreed_pro_version;
2772         int *rs_plan_s = NULL;
2773         int fifo_size = 0;
2774
2775         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2776                     : apv == 88 ? sizeof(struct p_rs_param)
2777                                         + SHARED_SECRET_MAX
2778                     : apv <= 94 ? sizeof(struct p_rs_param_89)
2779                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2780
2781         if (packet_size > exp_max_sz) {
2782                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2783                     packet_size, exp_max_sz);
2784                 return false;
2785         }
2786
2787         if (apv <= 88) {
2788                 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2789                 data_size   = packet_size  - header_size;
2790         } else if (apv <= 94) {
2791                 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2792                 data_size   = packet_size  - header_size;
2793                 D_ASSERT(data_size == 0);
2794         } else {
2795                 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2796                 data_size   = packet_size  - header_size;
2797                 D_ASSERT(data_size == 0);
2798         }
2799
2800         /* initialize verify_alg and csums_alg */
2801         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2802
2803         if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2804                 return false;
2805
2806         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2807
2808         if (apv >= 88) {
2809                 if (apv == 88) {
2810                         if (data_size > SHARED_SECRET_MAX) {
2811                                 dev_err(DEV, "verify-alg too long, "
2812                                     "peer wants %u, accepting only %u byte\n",
2813                                                 data_size, SHARED_SECRET_MAX);
2814                                 return false;
2815                         }
2816
2817                         if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2818                                 return false;
2819
2820                         /* we expect NUL terminated string */
2821                         /* but just in case someone tries to be evil */
2822                         D_ASSERT(p->verify_alg[data_size-1] == 0);
2823                         p->verify_alg[data_size-1] = 0;
2824
2825                 } else /* apv >= 89 */ {
2826                         /* we still expect NUL terminated strings */
2827                         /* but just in case someone tries to be evil */
2828                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2829                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2830                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2831                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2832                 }
2833
2834                 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2835                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2836                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2837                                     mdev->sync_conf.verify_alg, p->verify_alg);
2838                                 goto disconnect;
2839                         }
2840                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2841                                         p->verify_alg, "verify-alg");
2842                         if (IS_ERR(verify_tfm)) {
2843                                 verify_tfm = NULL;
2844                                 goto disconnect;
2845                         }
2846                 }
2847
2848                 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2849                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2850                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2851                                     mdev->sync_conf.csums_alg, p->csums_alg);
2852                                 goto disconnect;
2853                         }
2854                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2855                                         p->csums_alg, "csums-alg");
2856                         if (IS_ERR(csums_tfm)) {
2857                                 csums_tfm = NULL;
2858                                 goto disconnect;
2859                         }
2860                 }
2861
2862                 if (apv > 94) {
2863                         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2864                         mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2865                         mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2866                         mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2867                         mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2868
2869                         fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2870                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2871                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2872                                 if (!rs_plan_s) {
2873                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
2874                                         goto disconnect;
2875                                 }
2876                         }
2877                 }
2878
2879                 spin_lock(&mdev->peer_seq_lock);
2880                 /* lock against drbd_nl_syncer_conf() */
2881                 if (verify_tfm) {
2882                         strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2883                         mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2884                         crypto_free_hash(mdev->verify_tfm);
2885                         mdev->verify_tfm = verify_tfm;
2886                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2887                 }
2888                 if (csums_tfm) {
2889                         strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2890                         mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2891                         crypto_free_hash(mdev->csums_tfm);
2892                         mdev->csums_tfm = csums_tfm;
2893                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2894                 }
2895                 if (fifo_size != mdev->rs_plan_s.size) {
2896                         kfree(mdev->rs_plan_s.values);
2897                         mdev->rs_plan_s.values = rs_plan_s;
2898                         mdev->rs_plan_s.size   = fifo_size;
2899                         mdev->rs_planed = 0;
2900                 }
2901                 spin_unlock(&mdev->peer_seq_lock);
2902         }
2903
2904         return ok;
2905 disconnect:
2906         /* just for completeness: actually not needed,
2907          * as this is not reached if csums_tfm was ok. */
2908         crypto_free_hash(csums_tfm);
2909         /* but free the verify_tfm again, if csums_tfm did not work out */
2910         crypto_free_hash(verify_tfm);
2911         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2912         return false;
2913 }
2914
2915 /* warn if the arguments differ by more than 12.5% */
2916 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2917         const char *s, sector_t a, sector_t b)
2918 {
2919         sector_t d;
2920         if (a == 0 || b == 0)
2921                 return;
2922         d = (a > b) ? (a - b) : (b - a);
2923         if (d > (a>>3) || d > (b>>3))
2924                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2925                      (unsigned long long)a, (unsigned long long)b);
2926 }
2927
2928 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2929 {
2930         struct p_sizes *p = &mdev->data.rbuf.sizes;
2931         enum determine_dev_size dd = unchanged;
2932         sector_t p_size, p_usize, my_usize;
2933         int ldsc = 0; /* local disk size changed */
2934         enum dds_flags ddsf;
2935
2936         p_size = be64_to_cpu(p->d_size);
2937         p_usize = be64_to_cpu(p->u_size);
2938
2939         if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2940                 dev_err(DEV, "some backing storage is needed\n");
2941                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2942                 return false;
2943         }
2944
2945         /* just store the peer's disk size for now.
2946          * we still need to figure out whether we accept that. */
2947         mdev->p_size = p_size;
2948
2949         if (get_ldev(mdev)) {
2950                 warn_if_differ_considerably(mdev, "lower level device sizes",
2951                            p_size, drbd_get_max_capacity(mdev->ldev));
2952                 warn_if_differ_considerably(mdev, "user requested size",
2953                                             p_usize, mdev->ldev->dc.disk_size);
2954
2955                 /* if this is the first connect, or an otherwise expected
2956                  * param exchange, choose the minimum */
2957                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2958                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2959                                              p_usize);
2960
2961                 my_usize = mdev->ldev->dc.disk_size;
2962
2963                 if (mdev->ldev->dc.disk_size != p_usize) {
2964                         mdev->ldev->dc.disk_size = p_usize;
2965                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2966                              (unsigned long)mdev->ldev->dc.disk_size);
2967                 }
2968
2969                 /* Never shrink a device with usable data during connect.
2970                    But allow online shrinking if we are connected. */
2971                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2972                    drbd_get_capacity(mdev->this_bdev) &&
2973                    mdev->state.disk >= D_OUTDATED &&
2974                    mdev->state.conn < C_CONNECTED) {
2975                         dev_err(DEV, "The peer's disk size is too small!\n");
2976                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2977                         mdev->ldev->dc.disk_size = my_usize;
2978                         put_ldev(mdev);
2979                         return false;
2980                 }
2981                 put_ldev(mdev);
2982         }
2983
2984         ddsf = be16_to_cpu(p->dds_flags);
2985         if (get_ldev(mdev)) {
2986                 dd = drbd_determine_dev_size(mdev, ddsf);
2987                 put_ldev(mdev);
2988                 if (dd == dev_size_error)
2989                         return false;
2990                 drbd_md_sync(mdev);
2991         } else {
2992                 /* I am diskless, need to accept the peer's size. */
2993                 drbd_set_my_capacity(mdev, p_size);
2994         }
2995
2996         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
2997         drbd_reconsider_max_bio_size(mdev);
2998
2999         if (get_ldev(mdev)) {
3000                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3001                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3002                         ldsc = 1;
3003                 }
3004
3005                 put_ldev(mdev);
3006         }
3007
3008         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3009                 if (be64_to_cpu(p->c_size) !=
3010                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3011                         /* we have different sizes, probably peer
3012                          * needs to know my new size... */
3013                         drbd_send_sizes(mdev, 0, ddsf);
3014                 }
3015                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3016                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3017                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3018                             mdev->state.disk >= D_INCONSISTENT) {
3019                                 if (ddsf & DDSF_NO_RESYNC)
3020                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3021                                 else
3022                                         resync_after_online_grow(mdev);
3023                         } else
3024                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3025                 }
3026         }
3027
3028         return true;
3029 }
3030
3031 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3032 {
3033         struct p_uuids *p = &mdev->data.rbuf.uuids;
3034         u64 *p_uuid;
3035         int i, updated_uuids = 0;
3036
3037         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3038
3039         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3040                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3041
3042         kfree(mdev->p_uuid);
3043         mdev->p_uuid = p_uuid;
3044
3045         if (mdev->state.conn < C_CONNECTED &&
3046             mdev->state.disk < D_INCONSISTENT &&
3047             mdev->state.role == R_PRIMARY &&
3048             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3049                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3050                     (unsigned long long)mdev->ed_uuid);
3051                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3052                 return false;
3053         }
3054
3055         if (get_ldev(mdev)) {
3056                 int skip_initial_sync =
3057                         mdev->state.conn == C_CONNECTED &&
3058                         mdev->agreed_pro_version >= 90 &&
3059                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3060                         (p_uuid[UI_FLAGS] & 8);
3061                 if (skip_initial_sync) {
3062                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3063                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3064                                         "clear_n_write from receive_uuids",
3065                                         BM_LOCKED_TEST_ALLOWED);
3066                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3067                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3068                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3069                                         CS_VERBOSE, NULL);
3070                         drbd_md_sync(mdev);
3071                         updated_uuids = 1;
3072                 }
3073                 put_ldev(mdev);
3074         } else if (mdev->state.disk < D_INCONSISTENT &&
3075                    mdev->state.role == R_PRIMARY) {
3076                 /* I am a diskless primary, the peer just created a new current UUID
3077                    for me. */
3078                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3079         }
3080
3081         /* Before we test for the disk state, we should wait until an eventually
3082            ongoing cluster wide state change is finished. That is important if
3083            we are primary and are detaching from our disk. We need to see the
3084            new disk state... */
3085         wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3086         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3087                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3088
3089         if (updated_uuids)
3090                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3091
3092         return true;
3093 }
3094
3095 /**
3096  * convert_state() - Converts the peer's view of the cluster state to our point of view
3097  * @ps:         The state as seen by the peer.
3098  */
3099 static union drbd_state convert_state(union drbd_state ps)
3100 {
3101         union drbd_state ms;
3102
3103         static enum drbd_conns c_tab[] = {
3104                 [C_CONNECTED] = C_CONNECTED,
3105
3106                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3107                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3108                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3109                 [C_VERIFY_S]       = C_VERIFY_T,
3110                 [C_MASK]   = C_MASK,
3111         };
3112
3113         ms.i = ps.i;
3114
3115         ms.conn = c_tab[ps.conn];
3116         ms.peer = ps.role;
3117         ms.role = ps.peer;
3118         ms.pdsk = ps.disk;
3119         ms.disk = ps.pdsk;
3120         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3121
3122         return ms;
3123 }
3124
3125 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3126 {
3127         struct p_req_state *p = &mdev->data.rbuf.req_state;
3128         union drbd_state mask, val;
3129         enum drbd_state_rv rv;
3130
3131         mask.i = be32_to_cpu(p->mask);
3132         val.i = be32_to_cpu(p->val);
3133
3134         if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3135             test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3136                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3137                 return true;
3138         }
3139
3140         mask = convert_state(mask);
3141         val = convert_state(val);
3142
3143         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3144
3145         drbd_send_sr_reply(mdev, rv);
3146         drbd_md_sync(mdev);
3147
3148         return true;
3149 }
3150
3151 static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3152 {
3153         struct p_state *p = &mdev->data.rbuf.state;
3154         union drbd_state os, ns, peer_state;
3155         enum drbd_disk_state real_peer_disk;
3156         enum chg_state_flags cs_flags;
3157         int rv;
3158
3159         peer_state.i = be32_to_cpu(p->state);
3160
3161         real_peer_disk = peer_state.disk;
3162         if (peer_state.disk == D_NEGOTIATING) {
3163                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3164                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3165         }
3166
3167         spin_lock_irq(&mdev->req_lock);
3168  retry:
3169         os = ns = mdev->state;
3170         spin_unlock_irq(&mdev->req_lock);
3171
3172         /* peer says his disk is uptodate, while we think it is inconsistent,
3173          * and this happens while we think we have a sync going on. */
3174         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3175             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3176                 /* If we are (becoming) SyncSource, but peer is still in sync
3177                  * preparation, ignore its uptodate-ness to avoid flapping, it
3178                  * will change to inconsistent once the peer reaches active
3179                  * syncing states.
3180                  * It may have changed syncer-paused flags, however, so we
3181                  * cannot ignore this completely. */
3182                 if (peer_state.conn > C_CONNECTED &&
3183                     peer_state.conn < C_SYNC_SOURCE)
3184                         real_peer_disk = D_INCONSISTENT;
3185
3186                 /* if peer_state changes to connected at the same time,
3187                  * it explicitly notifies us that it finished resync.
3188                  * Maybe we should finish it up, too? */
3189                 else if (os.conn >= C_SYNC_SOURCE &&
3190                          peer_state.conn == C_CONNECTED) {
3191                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3192                                 drbd_resync_finished(mdev);
3193                         return true;
3194                 }
3195         }
3196
3197         /* peer says his disk is inconsistent, while we think it is uptodate,
3198          * and this happens while the peer still thinks we have a sync going on,
3199          * but we think we are already done with the sync.
3200          * We ignore this to avoid flapping pdsk.
3201          * This should not happen, if the peer is a recent version of drbd. */
3202         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3203             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3204                 real_peer_disk = D_UP_TO_DATE;
3205
3206         if (ns.conn == C_WF_REPORT_PARAMS)
3207                 ns.conn = C_CONNECTED;
3208
3209         if (peer_state.conn == C_AHEAD)
3210                 ns.conn = C_BEHIND;
3211
3212         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3213             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3214                 int cr; /* consider resync */
3215
3216                 /* if we established a new connection */
3217                 cr  = (os.conn < C_CONNECTED);
3218                 /* if we had an established connection
3219                  * and one of the nodes newly attaches a disk */
3220                 cr |= (os.conn == C_CONNECTED &&
3221                        (peer_state.disk == D_NEGOTIATING ||
3222                         os.disk == D_NEGOTIATING));
3223                 /* if we have both been inconsistent, and the peer has been
3224                  * forced to be UpToDate with --overwrite-data */
3225                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3226                 /* if we had been plain connected, and the admin requested to
3227                  * start a sync by "invalidate" or "invalidate-remote" */
3228                 cr |= (os.conn == C_CONNECTED &&
3229                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3230                                  peer_state.conn <= C_WF_BITMAP_T));
3231
3232                 if (cr)
3233                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3234
3235                 put_ldev(mdev);
3236                 if (ns.conn == C_MASK) {
3237                         ns.conn = C_CONNECTED;
3238                         if (mdev->state.disk == D_NEGOTIATING) {
3239                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3240                         } else if (peer_state.disk == D_NEGOTIATING) {
3241                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3242                                 peer_state.disk = D_DISKLESS;
3243                                 real_peer_disk = D_DISKLESS;
3244                         } else {
3245                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3246                                         return false;
3247                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3248                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3249                                 return false;
3250                         }
3251                 }
3252         }
3253
3254         spin_lock_irq(&mdev->req_lock);
3255         if (mdev->state.i != os.i)
3256                 goto retry;
3257         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3258         ns.peer = peer_state.role;
3259         ns.pdsk = real_peer_disk;
3260         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3261         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3262                 ns.disk = mdev->new_state_tmp.disk;
3263         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3264         if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3265             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3266                 /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3267                    for temporal network outages! */
3268                 spin_unlock_irq(&mdev->req_lock);
3269                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3270                 tl_clear(mdev);
3271                 drbd_uuid_new_current(mdev);
3272                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3273                 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3274                 return false;
3275         }
3276         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3277         ns = mdev->state;
3278         spin_unlock_irq(&mdev->req_lock);
3279
3280         if (rv < SS_SUCCESS) {
3281                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3282                 return false;
3283         }
3284
3285         if (os.conn > C_WF_REPORT_PARAMS) {
3286                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3287                     peer_state.disk != D_NEGOTIATING ) {
3288                         /* we want resync, peer has not yet decided to sync... */
3289                         /* Nowadays only used when forcing a node into primary role and
3290                            setting its disk to UpToDate with that */
3291                         drbd_send_uuids(mdev);
3292                         drbd_send_state(mdev);
3293                 }
3294         }
3295
3296         mdev->net_conf->want_lose = 0;
3297
3298         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3299
3300         return true;
3301 }
3302
3303 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3304 {
3305         struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3306
3307         wait_event(mdev->misc_wait,
3308                    mdev->state.conn == C_WF_SYNC_UUID ||
3309                    mdev->state.conn == C_BEHIND ||
3310                    mdev->state.conn < C_CONNECTED ||
3311                    mdev->state.disk < D_NEGOTIATING);
3312
3313         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3314
3315         /* Here the _drbd_uuid_ functions are right, current should
3316            _not_ be rotated into the history */
3317         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3318                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3319                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3320
3321                 drbd_print_uuids(mdev, "updated sync uuid");
3322                 drbd_start_resync(mdev, C_SYNC_TARGET);
3323
3324                 put_ldev(mdev);
3325         } else
3326                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3327
3328         return true;
3329 }
3330
3331 /**
3332  * receive_bitmap_plain
3333  *
3334  * Return 0 when done, 1 when another iteration is needed, and a negative error
3335  * code upon failure.
3336  */
3337 static int
3338 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3339                      unsigned long *buffer, struct bm_xfer_ctx *c)
3340 {
3341         unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3342         unsigned want = num_words * sizeof(long);
3343         int err;
3344
3345         if (want != data_size) {
3346                 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3347                 return -EIO;
3348         }
3349         if (want == 0)
3350                 return 0;
3351         err = drbd_recv(mdev, buffer, want);
3352         if (err != want) {
3353                 if (err >= 0)
3354                         err = -EIO;
3355                 return err;
3356         }
3357
3358         drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3359
3360         c->word_offset += num_words;
3361         c->bit_offset = c->word_offset * BITS_PER_LONG;
3362         if (c->bit_offset > c->bm_bits)
3363                 c->bit_offset = c->bm_bits;
3364
3365         return 1;
3366 }
3367
3368 /**
3369  * recv_bm_rle_bits
3370  *
3371  * Return 0 when done, 1 when another iteration is needed, and a negative error
3372  * code upon failure.
3373  */
3374 static int
3375 recv_bm_rle_bits(struct drbd_conf *mdev,
3376                 struct p_compressed_bm *p,
3377                 struct bm_xfer_ctx *c)
3378 {
3379         struct bitstream bs;
3380         u64 look_ahead;
3381         u64 rl;
3382         u64 tmp;
3383         unsigned long s = c->bit_offset;
3384         unsigned long e;
3385         int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3386         int toggle = DCBP_get_start(p);
3387         int have;
3388         int bits;
3389
3390         bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3391
3392         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3393         if (bits < 0)
3394                 return -EIO;
3395
3396         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3397                 bits = vli_decode_bits(&rl, look_ahead);
3398                 if (bits <= 0)
3399                         return -EIO;
3400
3401                 if (toggle) {
3402                         e = s + rl -1;
3403                         if (e >= c->bm_bits) {
3404                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3405                                 return -EIO;
3406                         }
3407                         _drbd_bm_set_bits(mdev, s, e);
3408                 }
3409
3410                 if (have < bits) {
3411                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3412                                 have, bits, look_ahead,
3413                                 (unsigned int)(bs.cur.b - p->code),
3414                                 (unsigned int)bs.buf_len);
3415                         return -EIO;
3416                 }
3417                 look_ahead >>= bits;
3418                 have -= bits;
3419
3420                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3421                 if (bits < 0)
3422                         return -EIO;
3423                 look_ahead |= tmp << have;
3424                 have += bits;
3425         }
3426
3427         c->bit_offset = s;
3428         bm_xfer_ctx_bit_to_word_offset(c);
3429
3430         return (s != c->bm_bits);
3431 }
3432
3433 /**
3434  * decode_bitmap_c
3435  *
3436  * Return 0 when done, 1 when another iteration is needed, and a negative error
3437  * code upon failure.
3438  */
3439 static int
3440 decode_bitmap_c(struct drbd_conf *mdev,
3441                 struct p_compressed_bm *p,
3442                 struct bm_xfer_ctx *c)
3443 {
3444         if (DCBP_get_code(p) == RLE_VLI_Bits)
3445                 return recv_bm_rle_bits(mdev, p, c);
3446
3447         /* other variants had been implemented for evaluation,
3448          * but have been dropped as this one turned out to be "best"
3449          * during all our tests. */
3450
3451         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3452         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3453         return -EIO;
3454 }
3455
3456 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3457                 const char *direction, struct bm_xfer_ctx *c)
3458 {
3459         /* what would it take to transfer it "plaintext" */
3460         unsigned plain = sizeof(struct p_header80) *
3461                 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3462                 + c->bm_words * sizeof(long);
3463         unsigned total = c->bytes[0] + c->bytes[1];
3464         unsigned r;
3465
3466         /* total can not be zero. but just in case: */
3467         if (total == 0)
3468                 return;
3469
3470         /* don't report if not compressed */
3471         if (total >= plain)
3472                 return;
3473
3474         /* total < plain. check for overflow, still */
3475         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3476                                     : (1000 * total / plain);
3477
3478         if (r > 1000)
3479                 r = 1000;
3480
3481         r = 1000 - r;
3482         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3483              "total %u; compression: %u.%u%%\n",
3484                         direction,
3485                         c->bytes[1], c->packets[1],
3486                         c->bytes[0], c->packets[0],
3487                         total, r/10, r % 10);
3488 }
3489
3490 /* Since we are processing the bitfield from lower addresses to higher,
3491    it does not matter if the process it in 32 bit chunks or 64 bit
3492    chunks as long as it is little endian. (Understand it as byte stream,
3493    beginning with the lowest byte...) If we would use big endian
3494    we would need to process it from the highest address to the lowest,
3495    in order to be agnostic to the 32 vs 64 bits issue.
3496
3497    returns 0 on failure, 1 if we successfully received it. */
3498 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3499 {
3500         struct bm_xfer_ctx c;
3501         void *buffer;
3502         int err;
3503         int ok = false;
3504         struct p_header80 *h = &mdev->data.rbuf.header.h80;
3505
3506         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3507         /* you are supposed to send additional out-of-sync information
3508          * if you actually set bits during this phase */
3509
3510         /* maybe we should use some per thread scratch page,
3511          * and allocate that during initial device creation? */
3512         buffer   = (unsigned long *) __get_free_page(GFP_NOIO);
3513         if (!buffer) {
3514                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3515                 goto out;
3516         }
3517
3518         c = (struct bm_xfer_ctx) {
3519                 .bm_bits = drbd_bm_bits(mdev),
3520                 .bm_words = drbd_bm_words(mdev),
3521         };
3522
3523         for(;;) {
3524                 if (cmd == P_BITMAP) {
3525                         err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3526                 } else if (cmd == P_COMPRESSED_BITMAP) {
3527                         /* MAYBE: sanity check that we speak proto >= 90,
3528                          * and the feature is enabled! */
3529                         struct p_compressed_bm *p;
3530
3531                         if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3532                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3533                                 goto out;
3534                         }
3535                         /* use the page buff */
3536                         p = buffer;
3537                         memcpy(p, h, sizeof(*h));
3538                         if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3539                                 goto out;
3540                         if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3541                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3542                                 goto out;
3543                         }
3544                         err = decode_bitmap_c(mdev, p, &c);
3545                 } else {
3546                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3547                         goto out;
3548                 }
3549
3550                 c.packets[cmd == P_BITMAP]++;
3551                 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3552
3553                 if (err <= 0) {
3554                         if (err < 0)
3555                                 goto out;
3556                         break;
3557                 }
3558                 if (!drbd_recv_header(mdev, &cmd, &data_size))
3559                         goto out;
3560         }
3561
3562         INFO_bm_xfer_stats(mdev, "receive", &c);
3563
3564         if (mdev->state.conn == C_WF_BITMAP_T) {
3565                 enum drbd_state_rv rv;
3566
3567                 ok = !drbd_send_bitmap(mdev);
3568                 if (!ok)
3569                         goto out;
3570                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3571                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3572                 D_ASSERT(rv == SS_SUCCESS);
3573         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3574                 /* admin may have requested C_DISCONNECTING,
3575                  * other threads may have noticed network errors */
3576                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3577                     drbd_conn_str(mdev->state.conn));
3578         }
3579
3580         ok = true;
3581  out:
3582         drbd_bm_unlock(mdev);
3583         if (ok && mdev->state.conn == C_WF_BITMAP_S)
3584                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3585         free_page((unsigned long) buffer);
3586         return ok;
3587 }
3588
3589 static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3590 {
3591         /* TODO zero copy sink :) */
3592         static char sink[128];
3593         int size, want, r;
3594
3595         dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3596                  cmd, data_size);
3597
3598         size = data_size;
3599         while (size > 0) {
3600                 want = min_t(int, size, sizeof(sink));
3601                 r = drbd_recv(mdev, sink, want);
3602                 ERR_IF(r <= 0) break;
3603                 size -= r;
3604         }
3605         return size == 0;
3606 }
3607
3608 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3609 {
3610         /* Make sure we've acked all the TCP data associated
3611          * with the data requests being unplugged */
3612         drbd_tcp_quickack(mdev->data.socket);
3613
3614         return true;
3615 }
3616
3617 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3618 {
3619         struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3620
3621         switch (mdev->state.conn) {
3622         case C_WF_SYNC_UUID:
3623         case C_WF_BITMAP_T:
3624         case C_BEHIND:
3625                         break;
3626         default:
3627                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3628                                 drbd_conn_str(mdev->state.conn));
3629         }
3630
3631         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3632
3633         return true;
3634 }
3635
3636 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3637
3638 struct data_cmd {
3639         int expect_payload;
3640         size_t pkt_size;
3641         drbd_cmd_handler_f function;
3642 };
3643
3644 static struct data_cmd drbd_cmd_handler[] = {
3645         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
3646         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
3647         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3648         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3649         [P_BITMAP]          = { 1, sizeof(struct p_header80), receive_bitmap } ,
3650         [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3651         [P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3652         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3653         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3654         [P_SYNC_PARAM]      = { 1, sizeof(struct p_header80), receive_SyncParam },
3655         [P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
3656         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3657         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
3658         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
3659         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
3660         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3661         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3662         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3663         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3664         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3665         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3666         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3667         /* anything missing from this table is in
3668          * the asender_tbl, see get_asender_cmd */
3669         [P_MAX_CMD]         = { 0, 0, NULL },
3670 };
3671
3672 /* All handler functions that expect a sub-header get that sub-heder in
3673    mdev->data.rbuf.header.head.payload.
3674
3675    Usually in mdev->data.rbuf.header.head the callback can find the usual
3676    p_header, but they may not rely on that. Since there is also p_header95 !
3677  */
3678
3679 static void drbdd(struct drbd_conf *mdev)
3680 {
3681         union p_header *header = &mdev->data.rbuf.header;
3682         unsigned int packet_size;
3683         enum drbd_packets cmd;
3684         size_t shs; /* sub header size */
3685         int rv;
3686
3687         while (get_t_state(&mdev->receiver) == Running) {
3688                 drbd_thread_current_set_cpu(mdev);
3689                 if (!drbd_recv_header(mdev, &cmd, &packet_size))
3690                         goto err_out;
3691
3692                 if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3693                         dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3694                         goto err_out;
3695                 }
3696
3697                 shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3698                 if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3699                         dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3700                         goto err_out;
3701                 }
3702
3703                 if (shs) {
3704                         rv = drbd_recv(mdev, &header->h80.payload, shs);
3705                         if (unlikely(rv != shs)) {
3706                                 if (!signal_pending(current))
3707                                         dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3708                                 goto err_out;
3709                         }
3710                 }
3711
3712                 rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3713
3714                 if (unlikely(!rv)) {
3715                         dev_err(DEV, "error receiving %s, l: %d!\n",
3716                             cmdname(cmd), packet_size);
3717                         goto err_out;
3718                 }
3719         }
3720
3721         if (0) {
3722         err_out:
3723                 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3724         }
3725         /* If we leave here, we probably want to update at least the
3726          * "Connected" indicator on stable storage. Do so explicitly here. */
3727         drbd_md_sync(mdev);
3728 }
3729
3730 void drbd_flush_workqueue(struct drbd_conf *mdev)
3731 {
3732         struct drbd_wq_barrier barr;
3733
3734         barr.w.cb = w_prev_work_done;
3735         init_completion(&barr.done);
3736         drbd_queue_work(&mdev->data.work, &barr.w);
3737         wait_for_completion(&barr.done);
3738 }
3739
3740 void drbd_free_tl_hash(struct drbd_conf *mdev)
3741 {
3742         struct hlist_head *h;
3743
3744         spin_lock_irq(&mdev->req_lock);
3745
3746         if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3747                 spin_unlock_irq(&mdev->req_lock);
3748                 return;
3749         }
3750         /* paranoia code */
3751         for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3752                 if (h->first)
3753                         dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3754                                 (int)(h - mdev->ee_hash), h->first);
3755         kfree(mdev->ee_hash);
3756         mdev->ee_hash = NULL;
3757         mdev->ee_hash_s = 0;
3758
3759         /* paranoia code */
3760         for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3761                 if (h->first)
3762                         dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3763                                 (int)(h - mdev->tl_hash), h->first);
3764         kfree(mdev->tl_hash);
3765         mdev->tl_hash = NULL;
3766         mdev->tl_hash_s = 0;
3767         spin_unlock_irq(&mdev->req_lock);
3768 }
3769
3770 static void drbd_disconnect(struct drbd_conf *mdev)
3771 {
3772         enum drbd_fencing_p fp;
3773         union drbd_state os, ns;
3774         int rv = SS_UNKNOWN_ERROR;
3775         unsigned int i;
3776
3777         if (mdev->state.conn == C_STANDALONE)
3778                 return;
3779
3780         /* asender does not clean up anything. it must not interfere, either */
3781         drbd_thread_stop(&mdev->asender);
3782         drbd_free_sock(mdev);
3783
3784         /* wait for current activity to cease. */
3785         spin_lock_irq(&mdev->req_lock);
3786         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3787         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3788         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3789         spin_unlock_irq(&mdev->req_lock);
3790
3791         /* We do not have data structures that would allow us to
3792          * get the rs_pending_cnt down to 0 again.
3793          *  * On C_SYNC_TARGET we do not have any data structures describing
3794          *    the pending RSDataRequest's we have sent.
3795          *  * On C_SYNC_SOURCE there is no data structure that tracks
3796          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3797          *  And no, it is not the sum of the reference counts in the
3798          *  resync_LRU. The resync_LRU tracks the whole operation including
3799          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3800          *  on the fly. */
3801         drbd_rs_cancel_all(mdev);
3802         mdev->rs_total = 0;
3803         mdev->rs_failed = 0;
3804         atomic_set(&mdev->rs_pending_cnt, 0);
3805         wake_up(&mdev->misc_wait);
3806
3807         /* make sure syncer is stopped and w_resume_next_sg queued */
3808         del_timer_sync(&mdev->resync_timer);
3809         resync_timer_fn((unsigned long)mdev);
3810
3811         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3812          * w_make_resync_request etc. which may still be on the worker queue
3813          * to be "canceled" */
3814         drbd_flush_workqueue(mdev);
3815
3816         /* This also does reclaim_net_ee().  If we do this too early, we might
3817          * miss some resync ee and pages.*/
3818         drbd_process_done_ee(mdev);
3819
3820         kfree(mdev->p_uuid);
3821         mdev->p_uuid = NULL;
3822
3823         if (!is_susp(mdev->state))
3824                 tl_clear(mdev);
3825
3826         dev_info(DEV, "Connection closed\n");
3827
3828         drbd_md_sync(mdev);
3829
3830         fp = FP_DONT_CARE;
3831         if (get_ldev(mdev)) {
3832                 fp = mdev->ldev->dc.fencing;
3833                 put_ldev(mdev);
3834         }
3835
3836         if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3837                 drbd_try_outdate_peer_async(mdev);
3838
3839         spin_lock_irq(&mdev->req_lock);
3840         os = mdev->state;
3841         if (os.conn >= C_UNCONNECTED) {
3842                 /* Do not restart in case we are C_DISCONNECTING */
3843                 ns = os;
3844                 ns.conn = C_UNCONNECTED;
3845                 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3846         }
3847         spin_unlock_irq(&mdev->req_lock);
3848
3849         if (os.conn == C_DISCONNECTING) {
3850                 wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3851
3852                 crypto_free_hash(mdev->cram_hmac_tfm);
3853                 mdev->cram_hmac_tfm = NULL;
3854
3855                 kfree(mdev->net_conf);
3856                 mdev->net_conf = NULL;
3857                 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3858         }
3859
3860         /* serialize with bitmap writeout triggered by the state change,
3861          * if any. */
3862         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3863
3864         /* tcp_close and release of sendpage pages can be deferred.  I don't
3865          * want to use SO_LINGER, because apparently it can be deferred for
3866          * more than 20 seconds (longest time I checked).
3867          *
3868          * Actually we don't care for exactly when the network stack does its
3869          * put_page(), but release our reference on these pages right here.
3870          */
3871         i = drbd_release_ee(mdev, &mdev->net_ee);
3872         if (i)
3873                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3874         i = atomic_read(&mdev->pp_in_use_by_net);
3875         if (i)
3876                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3877         i = atomic_read(&mdev->pp_in_use);
3878         if (i)
3879                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3880
3881         D_ASSERT(list_empty(&mdev->read_ee));
3882         D_ASSERT(list_empty(&mdev->active_ee));
3883         D_ASSERT(list_empty(&mdev->sync_ee));
3884         D_ASSERT(list_empty(&mdev->done_ee));
3885
3886         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3887         atomic_set(&mdev->current_epoch->epoch_size, 0);
3888         D_ASSERT(list_empty(&mdev->current_epoch->list));
3889 }
3890
3891 /*
3892  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3893  * we can agree on is stored in agreed_pro_version.
3894  *
3895  * feature flags and the reserved array should be enough room for future
3896  * enhancements of the handshake protocol, and possible plugins...
3897  *
3898  * for now, they are expected to be zero, but ignored.
3899  */
3900 static int drbd_send_handshake(struct drbd_conf *mdev)
3901 {
3902         /* ASSERT current == mdev->receiver ... */
3903         struct p_handshake *p = &mdev->data.sbuf.handshake;
3904         int ok;
3905
3906         if (mutex_lock_interruptible(&mdev->data.mutex)) {
3907                 dev_err(DEV, "interrupted during initial handshake\n");
3908                 return 0; /* interrupted. not ok. */
3909         }
3910
3911         if (mdev->data.socket == NULL) {
3912                 mutex_unlock(&mdev->data.mutex);
3913                 return 0;
3914         }
3915
3916         memset(p, 0, sizeof(*p));
3917         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3918         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3919         ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3920                              (struct p_header80 *)p, sizeof(*p), 0 );
3921         mutex_unlock(&mdev->data.mutex);
3922         return ok;
3923 }
3924
3925 /*
3926  * return values:
3927  *   1 yes, we have a valid connection
3928  *   0 oops, did not work out, please try again
3929  *  -1 peer talks different language,
3930  *     no point in trying again, please go standalone.
3931  */
3932 static int drbd_do_handshake(struct drbd_conf *mdev)
3933 {
3934         /* ASSERT current == mdev->receiver ... */
3935         struct p_handshake *p = &mdev->data.rbuf.handshake;
3936         const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
3937         unsigned int length;
3938         enum drbd_packets cmd;
3939         int rv;
3940
3941         rv = drbd_send_handshake(mdev);
3942         if (!rv)
3943                 return 0;
3944
3945         rv = drbd_recv_header(mdev, &cmd, &length);
3946         if (!rv)
3947                 return 0;
3948
3949         if (cmd != P_HAND_SHAKE) {
3950                 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3951                      cmdname(cmd), cmd);
3952                 return -1;
3953         }
3954
3955         if (length != expect) {
3956                 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3957                      expect, length);
3958                 return -1;
3959         }
3960
3961         rv = drbd_recv(mdev, &p->head.payload, expect);
3962
3963         if (rv != expect) {
3964                 if (!signal_pending(current))
3965                         dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
3966                 return 0;
3967         }
3968
3969         p->protocol_min = be32_to_cpu(p->protocol_min);
3970         p->protocol_max = be32_to_cpu(p->protocol_max);
3971         if (p->protocol_max == 0)
3972                 p->protocol_max = p->protocol_min;
3973
3974         if (PRO_VERSION_MAX < p->protocol_min ||
3975             PRO_VERSION_MIN > p->protocol_max)
3976                 goto incompat;
3977
3978         mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3979
3980         dev_info(DEV, "Handshake successful: "
3981              "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3982
3983         return 1;
3984
3985  incompat:
3986         dev_err(DEV, "incompatible DRBD dialects: "
3987             "I support %d-%d, peer supports %d-%d\n",
3988             PRO_VERSION_MIN, PRO_VERSION_MAX,
3989             p->protocol_min, p->protocol_max);
3990         return -1;
3991 }
3992
3993 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3994 static int drbd_do_auth(struct drbd_conf *mdev)
3995 {
3996         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3997         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3998         return -1;
3999 }
4000 #else
4001 #define CHALLENGE_LEN 64
4002
4003 /* Return value:
4004         1 - auth succeeded,
4005         0 - failed, try again (network error),
4006         -1 - auth failed, don't try again.
4007 */
4008
4009 static int drbd_do_auth(struct drbd_conf *mdev)
4010 {
4011         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4012         struct scatterlist sg;
4013         char *response = NULL;
4014         char *right_response = NULL;
4015         char *peers_ch = NULL;
4016         unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4017         unsigned int resp_size;
4018         struct hash_desc desc;
4019         enum drbd_packets cmd;
4020         unsigned int length;
4021         int rv;
4022
4023         desc.tfm = mdev->cram_hmac_tfm;
4024         desc.flags = 0;
4025
4026         rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4027                                 (u8 *)mdev->net_conf->shared_secret, key_len);
4028         if (rv) {
4029                 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4030                 rv = -1;
4031                 goto fail;
4032         }
4033
4034         get_random_bytes(my_challenge, CHALLENGE_LEN);
4035
4036         rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4037         if (!rv)
4038                 goto fail;
4039
4040         rv = drbd_recv_header(mdev, &cmd, &length);
4041         if (!rv)
4042                 goto fail;
4043
4044         if (cmd != P_AUTH_CHALLENGE) {
4045                 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4046                     cmdname(cmd), cmd);
4047                 rv = 0;
4048                 goto fail;
4049         }
4050
4051         if (length > CHALLENGE_LEN * 2) {
4052                 dev_err(DEV, "expected AuthChallenge payload too big.\n");
4053                 rv = -1;
4054                 goto fail;
4055         }
4056
4057         peers_ch = kmalloc(length, GFP_NOIO);
4058         if (peers_ch == NULL) {
4059                 dev_err(DEV, "kmalloc of peers_ch failed\n");
4060                 rv = -1;
4061                 goto fail;
4062         }
4063
4064         rv = drbd_recv(mdev, peers_ch, length);
4065
4066         if (rv != length) {
4067                 if (!signal_pending(current))
4068                         dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4069                 rv = 0;
4070                 goto fail;
4071         }
4072
4073         resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4074         response = kmalloc(resp_size, GFP_NOIO);
4075         if (response == NULL) {
4076                 dev_err(DEV, "kmalloc of response failed\n");
4077                 rv = -1;
4078                 goto fail;
4079         }
4080
4081         sg_init_table(&sg, 1);
4082         sg_set_buf(&sg, peers_ch, length);
4083
4084         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4085         if (rv) {
4086                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4087                 rv = -1;
4088                 goto fail;
4089         }
4090
4091         rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4092         if (!rv)
4093                 goto fail;
4094
4095         rv = drbd_recv_header(mdev, &cmd, &length);
4096         if (!rv)
4097                 goto fail;
4098
4099         if (cmd != P_AUTH_RESPONSE) {
4100                 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4101                         cmdname(cmd), cmd);
4102                 rv = 0;
4103                 goto fail;
4104         }
4105
4106         if (length != resp_size) {
4107                 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4108                 rv = 0;
4109                 goto fail;
4110         }
4111
4112         rv = drbd_recv(mdev, response , resp_size);
4113
4114         if (rv != resp_size) {
4115                 if (!signal_pending(current))
4116                         dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4117                 rv = 0;
4118                 goto fail;
4119         }
4120
4121         right_response = kmalloc(resp_size, GFP_NOIO);
4122         if (right_response == NULL) {
4123                 dev_err(DEV, "kmalloc of right_response failed\n");
4124                 rv = -1;
4125                 goto fail;
4126         }
4127
4128         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4129
4130         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4131         if (rv) {
4132                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4133                 rv = -1;
4134                 goto fail;
4135         }
4136
4137         rv = !memcmp(response, right_response, resp_size);
4138
4139         if (rv)
4140                 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4141                      resp_size, mdev->net_conf->cram_hmac_alg);
4142         else
4143                 rv = -1;
4144
4145  fail:
4146         kfree(peers_ch);
4147         kfree(response);
4148         kfree(right_response);
4149
4150         return rv;
4151 }
4152 #endif
4153
4154 int drbdd_init(struct drbd_thread *thi)
4155 {
4156         struct drbd_conf *mdev = thi->mdev;
4157         unsigned int minor = mdev_to_minor(mdev);
4158         int h;
4159
4160         sprintf(current->comm, "drbd%d_receiver", minor);
4161
4162         dev_info(DEV, "receiver (re)started\n");
4163
4164         do {
4165                 h = drbd_connect(mdev);
4166                 if (h == 0) {
4167                         drbd_disconnect(mdev);
4168                         schedule_timeout_interruptible(HZ);
4169                 }
4170                 if (h == -1) {
4171                         dev_warn(DEV, "Discarding network configuration.\n");
4172                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4173                 }
4174         } while (h == 0);
4175
4176         if (h > 0) {
4177                 if (get_net_conf(mdev)) {
4178                         drbdd(mdev);
4179                         put_net_conf(mdev);
4180                 }
4181         }
4182
4183         drbd_disconnect(mdev);
4184
4185         dev_info(DEV, "receiver terminated\n");
4186         return 0;
4187 }
4188
4189 /* ********* acknowledge sender ******** */
4190
4191 static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4192 {
4193         struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4194
4195         int retcode = be32_to_cpu(p->retcode);
4196
4197         if (retcode >= SS_SUCCESS) {
4198                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4199         } else {
4200                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4201                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4202                     drbd_set_st_err_str(retcode), retcode);
4203         }
4204         wake_up(&mdev->state_wait);
4205
4206         return true;
4207 }
4208
4209 static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4210 {
4211         return drbd_send_ping_ack(mdev);
4212
4213 }
4214
4215 static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4216 {
4217         /* restore idle timeout */
4218         mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4219         if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4220                 wake_up(&mdev->misc_wait);
4221
4222         return true;
4223 }
4224
4225 static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4226 {
4227         struct p_block_ack *p = (struct p_block_ack *)h;
4228         sector_t sector = be64_to_cpu(p->sector);
4229         int blksize = be32_to_cpu(p->blksize);
4230
4231         D_ASSERT(mdev->agreed_pro_version >= 89);
4232
4233         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4234
4235         if (get_ldev(mdev)) {
4236                 drbd_rs_complete_io(mdev, sector);
4237                 drbd_set_in_sync(mdev, sector, blksize);
4238                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4239                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4240                 put_ldev(mdev);
4241         }
4242         dec_rs_pending(mdev);
4243         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4244
4245         return true;
4246 }
4247
4248 /* when we receive the ACK for a write request,
4249  * verify that we actually know about it */
4250 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4251         u64 id, sector_t sector)
4252 {
4253         struct hlist_head *slot = tl_hash_slot(mdev, sector);
4254         struct hlist_node *n;
4255         struct drbd_request *req;
4256
4257         hlist_for_each_entry(req, n, slot, collision) {
4258                 if ((unsigned long)req == (unsigned long)id) {
4259                         if (req->sector != sector) {
4260                                 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4261                                     "wrong sector (%llus versus %llus)\n", req,
4262                                     (unsigned long long)req->sector,
4263                                     (unsigned long long)sector);
4264                                 break;
4265                         }
4266                         return req;
4267                 }
4268         }
4269         return NULL;
4270 }
4271
4272 typedef struct drbd_request *(req_validator_fn)
4273         (struct drbd_conf *mdev, u64 id, sector_t sector);
4274
4275 static int validate_req_change_req_state(struct drbd_conf *mdev,
4276         u64 id, sector_t sector, req_validator_fn validator,
4277         const char *func, enum drbd_req_event what)
4278 {
4279         struct drbd_request *req;
4280         struct bio_and_error m;
4281
4282         spin_lock_irq(&mdev->req_lock);
4283         req = validator(mdev, id, sector);
4284         if (unlikely(!req)) {
4285                 spin_unlock_irq(&mdev->req_lock);
4286
4287                 dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func,
4288                         (void *)(unsigned long)id, (unsigned long long)sector);
4289                 return false;
4290         }
4291         __req_mod(req, what, &m);
4292         spin_unlock_irq(&mdev->req_lock);
4293
4294         if (m.bio)
4295                 complete_master_bio(mdev, &m);
4296         return true;
4297 }
4298
4299 static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4300 {
4301         struct p_block_ack *p = (struct p_block_ack *)h;
4302         sector_t sector = be64_to_cpu(p->sector);
4303         int blksize = be32_to_cpu(p->blksize);
4304         enum drbd_req_event what;
4305
4306         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4307
4308         if (is_syncer_block_id(p->block_id)) {
4309                 drbd_set_in_sync(mdev, sector, blksize);
4310                 dec_rs_pending(mdev);
4311                 return true;
4312         }
4313         switch (be16_to_cpu(h->command)) {
4314         case P_RS_WRITE_ACK:
4315                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4316                 what = write_acked_by_peer_and_sis;
4317                 break;
4318         case P_WRITE_ACK:
4319                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4320                 what = write_acked_by_peer;
4321                 break;
4322         case P_RECV_ACK:
4323                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4324                 what = recv_acked_by_peer;
4325                 break;
4326         case P_DISCARD_ACK:
4327                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4328                 what = conflict_discarded_by_peer;
4329                 break;
4330         default:
4331                 D_ASSERT(0);
4332                 return false;
4333         }
4334
4335         return validate_req_change_req_state(mdev, p->block_id, sector,
4336                 _ack_id_to_req, __func__ , what);
4337 }
4338
4339 static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4340 {
4341         struct p_block_ack *p = (struct p_block_ack *)h;
4342         sector_t sector = be64_to_cpu(p->sector);
4343         int size = be32_to_cpu(p->blksize);
4344         struct drbd_request *req;
4345         struct bio_and_error m;
4346
4347         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4348
4349         if (is_syncer_block_id(p->block_id)) {
4350                 dec_rs_pending(mdev);
4351                 drbd_rs_failed_io(mdev, sector, size);
4352                 return true;
4353         }
4354
4355         spin_lock_irq(&mdev->req_lock);
4356         req = _ack_id_to_req(mdev, p->block_id, sector);
4357         if (!req) {
4358                 spin_unlock_irq(&mdev->req_lock);
4359                 if (mdev->net_conf->wire_protocol == DRBD_PROT_A ||
4360                     mdev->net_conf->wire_protocol == DRBD_PROT_B) {
4361                         /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4362                            The master bio might already be completed, therefore the
4363                            request is no longer in the collision hash.
4364                            => Do not try to validate block_id as request. */
4365                         /* In Protocol B we might already have got a P_RECV_ACK
4366                            but then get a P_NEG_ACK after wards. */
4367                         drbd_set_out_of_sync(mdev, sector, size);
4368                         return true;
4369                 } else {
4370                         dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__,
4371                                 (void *)(unsigned long)p->block_id, (unsigned long long)sector);
4372                         return false;
4373                 }
4374         }
4375         __req_mod(req, neg_acked, &m);
4376         spin_unlock_irq(&mdev->req_lock);
4377
4378         if (m.bio)
4379                 complete_master_bio(mdev, &m);
4380         return true;
4381 }
4382
4383 static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4384 {
4385         struct p_block_ack *p = (struct p_block_ack *)h;
4386         sector_t sector = be64_to_cpu(p->sector);
4387
4388         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4389         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4390             (unsigned long long)sector, be32_to_cpu(p->blksize));
4391
4392         return validate_req_change_req_state(mdev, p->block_id, sector,
4393                 _ar_id_to_req, __func__ , neg_acked);
4394 }
4395
4396 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4397 {
4398         sector_t sector;
4399         int size;
4400         struct p_block_ack *p = (struct p_block_ack *)h;
4401
4402         sector = be64_to_cpu(p->sector);
4403         size = be32_to_cpu(p->blksize);
4404
4405         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4406
4407         dec_rs_pending(mdev);
4408
4409         if (get_ldev_if_state(mdev, D_FAILED)) {
4410                 drbd_rs_complete_io(mdev, sector);
4411                 switch (be16_to_cpu(h->command)) {
4412                 case P_NEG_RS_DREPLY:
4413                         drbd_rs_failed_io(mdev, sector, size);
4414                 case P_RS_CANCEL:
4415                         break;
4416                 default:
4417                         D_ASSERT(0);
4418                         put_ldev(mdev);
4419                         return false;
4420                 }
4421                 put_ldev(mdev);
4422         }
4423
4424         return true;
4425 }
4426
4427 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4428 {
4429         struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4430
4431         tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4432
4433         if (mdev->state.conn == C_AHEAD &&
4434             atomic_read(&mdev->ap_in_flight) == 0 &&
4435             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4436                 mdev->start_resync_timer.expires = jiffies + HZ;
4437                 add_timer(&mdev->start_resync_timer);
4438         }
4439
4440         return true;
4441 }
4442
4443 static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4444 {
4445         struct p_block_ack *p = (struct p_block_ack *)h;
4446         struct drbd_work *w;
4447         sector_t sector;
4448         int size;
4449
4450         sector = be64_to_cpu(p->sector);
4451         size = be32_to_cpu(p->blksize);
4452
4453         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4454
4455         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4456                 drbd_ov_oos_found(mdev, sector, size);
4457         else
4458                 ov_oos_print(mdev);
4459
4460         if (!get_ldev(mdev))
4461                 return true;
4462
4463         drbd_rs_complete_io(mdev, sector);
4464         dec_rs_pending(mdev);
4465
4466         --mdev->ov_left;
4467
4468         /* let's advance progress step marks only for every other megabyte */
4469         if ((mdev->ov_left & 0x200) == 0x200)
4470                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4471
4472         if (mdev->ov_left == 0) {
4473                 w = kmalloc(sizeof(*w), GFP_NOIO);
4474                 if (w) {
4475                         w->cb = w_ov_finished;
4476                         drbd_queue_work_front(&mdev->data.work, w);
4477                 } else {
4478                         dev_err(DEV, "kmalloc(w) failed.");
4479                         ov_oos_print(mdev);
4480                         drbd_resync_finished(mdev);
4481                 }
4482         }
4483         put_ldev(mdev);
4484         return true;
4485 }
4486
4487 static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4488 {
4489         return true;
4490 }
4491
4492 struct asender_cmd {
4493         size_t pkt_size;
4494         int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4495 };
4496
4497 static struct asender_cmd *get_asender_cmd(int cmd)
4498 {
4499         static struct asender_cmd asender_tbl[] = {
4500                 /* anything missing from this table is in
4501                  * the drbd_cmd_handler (drbd_default_handler) table,
4502                  * see the beginning of drbdd() */
4503         [P_PING]            = { sizeof(struct p_header80), got_Ping },
4504         [P_PING_ACK]        = { sizeof(struct p_header80), got_PingAck },
4505         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4506         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4507         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4508         [P_DISCARD_ACK]     = { sizeof(struct p_block_ack), got_BlockAck },
4509         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4510         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4511         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4512         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4513         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4514         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4515         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4516         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4517         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply},
4518         [P_MAX_CMD]         = { 0, NULL },
4519         };
4520         if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4521                 return NULL;
4522         return &asender_tbl[cmd];
4523 }
4524
4525 int drbd_asender(struct drbd_thread *thi)
4526 {
4527         struct drbd_conf *mdev = thi->mdev;
4528         struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4529         struct asender_cmd *cmd = NULL;
4530
4531         int rv, len;
4532         void *buf    = h;
4533         int received = 0;
4534         int expect   = sizeof(struct p_header80);
4535         int empty;
4536         int ping_timeout_active = 0;
4537
4538         sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4539
4540         current->policy = SCHED_RR;  /* Make this a realtime task! */
4541         current->rt_priority = 2;    /* more important than all other tasks */
4542
4543         while (get_t_state(thi) == Running) {
4544                 drbd_thread_current_set_cpu(mdev);
4545                 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4546                         ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4547                         mdev->meta.socket->sk->sk_rcvtimeo =
4548                                 mdev->net_conf->ping_timeo*HZ/10;
4549                         ping_timeout_active = 1;
4550                 }
4551
4552                 /* conditionally cork;
4553                  * it may hurt latency if we cork without much to send */
4554                 if (!mdev->net_conf->no_cork &&
4555                         3 < atomic_read(&mdev->unacked_cnt))
4556                         drbd_tcp_cork(mdev->meta.socket);
4557                 while (1) {
4558                         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4559                         flush_signals(current);
4560                         if (!drbd_process_done_ee(mdev))
4561                                 goto reconnect;
4562                         /* to avoid race with newly queued ACKs */
4563                         set_bit(SIGNAL_ASENDER, &mdev->flags);
4564                         spin_lock_irq(&mdev->req_lock);
4565                         empty = list_empty(&mdev->done_ee);
4566                         spin_unlock_irq(&mdev->req_lock);
4567                         /* new ack may have been queued right here,
4568                          * but then there is also a signal pending,
4569                          * and we start over... */
4570                         if (empty)
4571                                 break;
4572                 }
4573                 /* but unconditionally uncork unless disabled */
4574                 if (!mdev->net_conf->no_cork)
4575                         drbd_tcp_uncork(mdev->meta.socket);
4576
4577                 /* short circuit, recv_msg would return EINTR anyways. */
4578                 if (signal_pending(current))
4579                         continue;
4580
4581                 rv = drbd_recv_short(mdev, mdev->meta.socket,
4582                                      buf, expect-received, 0);
4583                 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4584
4585                 flush_signals(current);
4586
4587                 /* Note:
4588                  * -EINTR        (on meta) we got a signal
4589                  * -EAGAIN       (on meta) rcvtimeo expired
4590                  * -ECONNRESET   other side closed the connection
4591                  * -ERESTARTSYS  (on data) we got a signal
4592                  * rv <  0       other than above: unexpected error!
4593                  * rv == expected: full header or command
4594                  * rv <  expected: "woken" by signal during receive
4595                  * rv == 0       : "connection shut down by peer"
4596                  */
4597                 if (likely(rv > 0)) {
4598                         received += rv;
4599                         buf      += rv;
4600                 } else if (rv == 0) {
4601                         dev_err(DEV, "meta connection shut down by peer.\n");
4602                         goto reconnect;
4603                 } else if (rv == -EAGAIN) {
4604                         /* If the data socket received something meanwhile,
4605                          * that is good enough: peer is still alive. */
4606                         if (time_after(mdev->last_received,
4607                                 jiffies - mdev->meta.socket->sk->sk_rcvtimeo))
4608                                 continue;
4609                         if (ping_timeout_active) {
4610                                 dev_err(DEV, "PingAck did not arrive in time.\n");
4611                                 goto reconnect;
4612                         }
4613                         set_bit(SEND_PING, &mdev->flags);
4614                         continue;
4615                 } else if (rv == -EINTR) {
4616                         continue;
4617                 } else {
4618                         dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4619                         goto reconnect;
4620                 }
4621
4622                 if (received == expect && cmd == NULL) {
4623                         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4624                                 dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4625                                     be32_to_cpu(h->magic),
4626                                     be16_to_cpu(h->command),
4627                                     be16_to_cpu(h->length));
4628                                 goto reconnect;
4629                         }
4630                         cmd = get_asender_cmd(be16_to_cpu(h->command));
4631                         len = be16_to_cpu(h->length);
4632                         if (unlikely(cmd == NULL)) {
4633                                 dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4634                                     be32_to_cpu(h->magic),
4635                                     be16_to_cpu(h->command),
4636                                     be16_to_cpu(h->length));
4637                                 goto disconnect;
4638                         }
4639                         expect = cmd->pkt_size;
4640                         ERR_IF(len != expect-sizeof(struct p_header80))
4641                                 goto reconnect;
4642                 }
4643                 if (received == expect) {
4644                         mdev->last_received = jiffies;
4645                         D_ASSERT(cmd != NULL);
4646                         if (!cmd->process(mdev, h))
4647                                 goto reconnect;
4648
4649                         /* the idle_timeout (ping-int)
4650                          * has been restored in got_PingAck() */
4651                         if (cmd == get_asender_cmd(P_PING_ACK))
4652                                 ping_timeout_active = 0;
4653
4654                         buf      = h;
4655                         received = 0;
4656                         expect   = sizeof(struct p_header80);
4657                         cmd      = NULL;
4658                 }
4659         }
4660
4661         if (0) {
4662 reconnect:
4663                 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4664                 drbd_md_sync(mdev);
4665         }
4666         if (0) {
4667 disconnect:
4668                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4669                 drbd_md_sync(mdev);
4670         }
4671         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4672
4673         D_ASSERT(mdev->state.conn < C_CONNECTED);
4674         dev_info(DEV, "asender terminated\n");
4675
4676         return 0;
4677 }