]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
drbd: Made cmp_after_sb() more generic into convert_after_sb()
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490
491 out:
492         return err;
493 }
494
495 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
496 {
497         mm_segment_t oldfs;
498         struct kvec iov = {
499                 .iov_base = buf,
500                 .iov_len = size,
501         };
502         struct msghdr msg = {
503                 .msg_iovlen = 1,
504                 .msg_iov = (struct iovec *)&iov,
505                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506         };
507         int rv;
508
509         oldfs = get_fs();
510         set_fs(KERNEL_DS);
511         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512         set_fs(oldfs);
513
514         return rv;
515 }
516
517 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
518 {
519         mm_segment_t oldfs;
520         struct kvec iov = {
521                 .iov_base = buf,
522                 .iov_len = size,
523         };
524         struct msghdr msg = {
525                 .msg_iovlen = 1,
526                 .msg_iov = (struct iovec *)&iov,
527                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528         };
529         int rv;
530
531         oldfs = get_fs();
532         set_fs(KERNEL_DS);
533
534         for (;;) {
535                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
536                 if (rv == size)
537                         break;
538
539                 /* Note:
540                  * ECONNRESET   other side closed the connection
541                  * ERESTARTSYS  (on  sock) we got a signal
542                  */
543
544                 if (rv < 0) {
545                         if (rv == -ECONNRESET)
546                                 conn_info(tconn, "sock was reset by peer\n");
547                         else if (rv != -ERESTARTSYS)
548                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
549                         break;
550                 } else if (rv == 0) {
551                         conn_info(tconn, "sock was shut down by peer\n");
552                         break;
553                 } else  {
554                         /* signal came in, or peer/link went down,
555                          * after we read a partial message
556                          */
557                         /* D_ASSERT(signal_pending(current)); */
558                         break;
559                 }
560         };
561
562         set_fs(oldfs);
563
564         if (rv != size)
565                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
566
567         return rv;
568 }
569
570 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
571 {
572         int err;
573
574         err = drbd_recv(tconn, buf, size);
575         if (err != size) {
576                 if (err >= 0)
577                         err = -EIO;
578         } else
579                 err = 0;
580         return err;
581 }
582
583 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
584 {
585         int err;
586
587         err = drbd_recv_all(tconn, buf, size);
588         if (err && !signal_pending(current))
589                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
590         return err;
591 }
592
593 /* quoting tcp(7):
594  *   On individual connections, the socket buffer size must be set prior to the
595  *   listen(2) or connect(2) calls in order to have it take effect.
596  * This is our wrapper to do so.
597  */
598 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599                 unsigned int rcv)
600 {
601         /* open coded SO_SNDBUF, SO_RCVBUF */
602         if (snd) {
603                 sock->sk->sk_sndbuf = snd;
604                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605         }
606         if (rcv) {
607                 sock->sk->sk_rcvbuf = rcv;
608                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
609         }
610 }
611
612 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
613 {
614         const char *what;
615         struct socket *sock;
616         struct sockaddr_in6 src_in6;
617         struct sockaddr_in6 peer_in6;
618         struct net_conf *nc;
619         int err, peer_addr_len, my_addr_len;
620         int sndbuf_size, rcvbuf_size, connect_int;
621         int disconnect_on_error = 1;
622
623         rcu_read_lock();
624         nc = rcu_dereference(tconn->net_conf);
625         if (!nc) {
626                 rcu_read_unlock();
627                 return NULL;
628         }
629
630         sndbuf_size = nc->sndbuf_size;
631         rcvbuf_size = nc->rcvbuf_size;
632         connect_int = nc->connect_int;
633
634         my_addr_len = min_t(int, nc->my_addr_len, sizeof(src_in6));
635         memcpy(&src_in6, nc->my_addr, my_addr_len);
636
637         if (((struct sockaddr *)nc->my_addr)->sa_family == AF_INET6)
638                 src_in6.sin6_port = 0;
639         else
640                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
641
642         peer_addr_len = min_t(int, nc->peer_addr_len, sizeof(src_in6));
643         memcpy(&peer_in6, nc->peer_addr, peer_addr_len);
644
645         rcu_read_unlock();
646
647         what = "sock_create_kern";
648         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
649                                SOCK_STREAM, IPPROTO_TCP, &sock);
650         if (err < 0) {
651                 sock = NULL;
652                 goto out;
653         }
654
655         sock->sk->sk_rcvtimeo =
656         sock->sk->sk_sndtimeo = connect_int * HZ;
657         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
658
659        /* explicitly bind to the configured IP as source IP
660         *  for the outgoing connections.
661         *  This is needed for multihomed hosts and to be
662         *  able to use lo: interfaces for drbd.
663         * Make sure to use 0 as port number, so linux selects
664         *  a free one dynamically.
665         */
666         what = "bind before connect";
667         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
668         if (err < 0)
669                 goto out;
670
671         /* connect may fail, peer not yet available.
672          * stay C_WF_CONNECTION, don't go Disconnecting! */
673         disconnect_on_error = 0;
674         what = "connect";
675         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
676
677 out:
678         if (err < 0) {
679                 if (sock) {
680                         sock_release(sock);
681                         sock = NULL;
682                 }
683                 switch (-err) {
684                         /* timeout, busy, signal pending */
685                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
686                 case EINTR: case ERESTARTSYS:
687                         /* peer not (yet) available, network problem */
688                 case ECONNREFUSED: case ENETUNREACH:
689                 case EHOSTDOWN:    case EHOSTUNREACH:
690                         disconnect_on_error = 0;
691                         break;
692                 default:
693                         conn_err(tconn, "%s failed, err = %d\n", what, err);
694                 }
695                 if (disconnect_on_error)
696                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
697         }
698
699         return sock;
700 }
701
702 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
703 {
704         int timeo, err, my_addr_len;
705         int sndbuf_size, rcvbuf_size, connect_int;
706         struct socket *s_estab = NULL, *s_listen;
707         struct sockaddr_in6 my_addr;
708         struct net_conf *nc;
709         const char *what;
710
711         rcu_read_lock();
712         nc = rcu_dereference(tconn->net_conf);
713         if (!nc) {
714                 rcu_read_unlock();
715                 return NULL;
716         }
717
718         sndbuf_size = nc->sndbuf_size;
719         rcvbuf_size = nc->rcvbuf_size;
720         connect_int = nc->connect_int;
721
722         my_addr_len = min_t(int, nc->my_addr_len, sizeof(struct sockaddr_in6));
723         memcpy(&my_addr, nc->my_addr, my_addr_len);
724         rcu_read_unlock();
725
726         what = "sock_create_kern";
727         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
728                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
729         if (err) {
730                 s_listen = NULL;
731                 goto out;
732         }
733
734         timeo = connect_int * HZ;
735         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
736
737         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
738         s_listen->sk->sk_rcvtimeo = timeo;
739         s_listen->sk->sk_sndtimeo = timeo;
740         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
741
742         what = "bind before listen";
743         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
744         if (err < 0)
745                 goto out;
746
747         err = drbd_accept(&what, s_listen, &s_estab);
748
749 out:
750         if (s_listen)
751                 sock_release(s_listen);
752         if (err < 0) {
753                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
754                         conn_err(tconn, "%s failed, err = %d\n", what, err);
755                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
756                 }
757         }
758
759         return s_estab;
760 }
761
762 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
763
764 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
765                              enum drbd_packet cmd)
766 {
767         if (!conn_prepare_command(tconn, sock))
768                 return -EIO;
769         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
770 }
771
772 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
773 {
774         unsigned int header_size = drbd_header_size(tconn);
775         struct packet_info pi;
776         int err;
777
778         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
779         if (err != header_size) {
780                 if (err >= 0)
781                         err = -EIO;
782                 return err;
783         }
784         err = decode_header(tconn, tconn->data.rbuf, &pi);
785         if (err)
786                 return err;
787         return pi.cmd;
788 }
789
790 /**
791  * drbd_socket_okay() - Free the socket if its connection is not okay
792  * @sock:       pointer to the pointer to the socket.
793  */
794 static int drbd_socket_okay(struct socket **sock)
795 {
796         int rr;
797         char tb[4];
798
799         if (!*sock)
800                 return false;
801
802         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
803
804         if (rr > 0 || rr == -EAGAIN) {
805                 return true;
806         } else {
807                 sock_release(*sock);
808                 *sock = NULL;
809                 return false;
810         }
811 }
812 /* Gets called if a connection is established, or if a new minor gets created
813    in a connection */
814 int drbd_connected(struct drbd_conf *mdev)
815 {
816         int err;
817
818         atomic_set(&mdev->packet_seq, 0);
819         mdev->peer_seq = 0;
820
821         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
822                 &mdev->tconn->cstate_mutex :
823                 &mdev->own_state_mutex;
824
825         err = drbd_send_sync_param(mdev);
826         if (!err)
827                 err = drbd_send_sizes(mdev, 0, 0);
828         if (!err)
829                 err = drbd_send_uuids(mdev);
830         if (!err)
831                 err = drbd_send_state(mdev);
832         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
833         clear_bit(RESIZE_PENDING, &mdev->flags);
834         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
835         return err;
836 }
837
838 /*
839  * return values:
840  *   1 yes, we have a valid connection
841  *   0 oops, did not work out, please try again
842  *  -1 peer talks different language,
843  *     no point in trying again, please go standalone.
844  *  -2 We do not have a network config...
845  */
846 static int conn_connect(struct drbd_tconn *tconn)
847 {
848         struct socket *sock, *msock;
849         struct drbd_conf *mdev;
850         struct net_conf *nc;
851         int vnr, timeout, try, h, ok;
852
853         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
854                 return -2;
855
856         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
857
858         /* Assume that the peer only understands protocol 80 until we know better.  */
859         tconn->agreed_pro_version = 80;
860
861         do {
862                 struct socket *s;
863
864                 for (try = 0;;) {
865                         /* 3 tries, this should take less than a second! */
866                         s = drbd_try_connect(tconn);
867                         if (s || ++try >= 3)
868                                 break;
869                         /* give the other side time to call bind() & listen() */
870                         schedule_timeout_interruptible(HZ / 10);
871                 }
872
873                 if (s) {
874                         if (!tconn->data.socket) {
875                                 tconn->data.socket = s;
876                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
877                         } else if (!tconn->meta.socket) {
878                                 tconn->meta.socket = s;
879                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
880                         } else {
881                                 conn_err(tconn, "Logic error in conn_connect()\n");
882                                 goto out_release_sockets;
883                         }
884                 }
885
886                 if (tconn->data.socket && tconn->meta.socket) {
887                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
888                         ok = drbd_socket_okay(&tconn->data.socket);
889                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
890                         if (ok)
891                                 break;
892                 }
893
894 retry:
895                 s = drbd_wait_for_connect(tconn);
896                 if (s) {
897                         try = receive_first_packet(tconn, s);
898                         drbd_socket_okay(&tconn->data.socket);
899                         drbd_socket_okay(&tconn->meta.socket);
900                         switch (try) {
901                         case P_INITIAL_DATA:
902                                 if (tconn->data.socket) {
903                                         conn_warn(tconn, "initial packet S crossed\n");
904                                         sock_release(tconn->data.socket);
905                                 }
906                                 tconn->data.socket = s;
907                                 break;
908                         case P_INITIAL_META:
909                                 if (tconn->meta.socket) {
910                                         conn_warn(tconn, "initial packet M crossed\n");
911                                         sock_release(tconn->meta.socket);
912                                 }
913                                 tconn->meta.socket = s;
914                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
915                                 break;
916                         default:
917                                 conn_warn(tconn, "Error receiving initial packet\n");
918                                 sock_release(s);
919                                 if (random32() & 1)
920                                         goto retry;
921                         }
922                 }
923
924                 if (tconn->cstate <= C_DISCONNECTING)
925                         goto out_release_sockets;
926                 if (signal_pending(current)) {
927                         flush_signals(current);
928                         smp_rmb();
929                         if (get_t_state(&tconn->receiver) == EXITING)
930                                 goto out_release_sockets;
931                 }
932
933                 if (tconn->data.socket && &tconn->meta.socket) {
934                         ok = drbd_socket_okay(&tconn->data.socket);
935                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
936                         if (ok)
937                                 break;
938                 }
939         } while (1);
940
941         sock  = tconn->data.socket;
942         msock = tconn->meta.socket;
943
944         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
945         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
946
947         sock->sk->sk_allocation = GFP_NOIO;
948         msock->sk->sk_allocation = GFP_NOIO;
949
950         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
951         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
952
953         /* NOT YET ...
954          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
955          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
956          * first set it to the P_CONNECTION_FEATURES timeout,
957          * which we set to 4x the configured ping_timeout. */
958         rcu_read_lock();
959         nc = rcu_dereference(tconn->net_conf);
960
961         sock->sk->sk_sndtimeo =
962         sock->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
963
964         msock->sk->sk_rcvtimeo = nc->ping_int*HZ;
965         timeout = nc->timeout * HZ / 10;
966         rcu_read_unlock();
967
968         msock->sk->sk_sndtimeo = timeout;
969
970         /* we don't want delays.
971          * we use TCP_CORK where appropriate, though */
972         drbd_tcp_nodelay(sock);
973         drbd_tcp_nodelay(msock);
974
975         tconn->last_received = jiffies;
976
977         h = drbd_do_features(tconn);
978         if (h <= 0)
979                 return h;
980
981         if (tconn->cram_hmac_tfm) {
982                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
983                 switch (drbd_do_auth(tconn)) {
984                 case -1:
985                         conn_err(tconn, "Authentication of peer failed\n");
986                         return -1;
987                 case 0:
988                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
989                         return 0;
990                 }
991         }
992
993         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
994                 return 0;
995
996         sock->sk->sk_sndtimeo = timeout;
997         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
998
999         drbd_thread_start(&tconn->asender);
1000
1001         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1002                 return -1;
1003
1004         rcu_read_lock();
1005         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1006                 kref_get(&mdev->kref);
1007                 rcu_read_unlock();
1008                 drbd_connected(mdev);
1009                 kref_put(&mdev->kref, &drbd_minor_destroy);
1010                 rcu_read_lock();
1011         }
1012         rcu_read_unlock();
1013
1014         return h;
1015
1016 out_release_sockets:
1017         if (tconn->data.socket) {
1018                 sock_release(tconn->data.socket);
1019                 tconn->data.socket = NULL;
1020         }
1021         if (tconn->meta.socket) {
1022                 sock_release(tconn->meta.socket);
1023                 tconn->meta.socket = NULL;
1024         }
1025         return -1;
1026 }
1027
1028 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1029 {
1030         unsigned int header_size = drbd_header_size(tconn);
1031
1032         if (header_size == sizeof(struct p_header100) &&
1033             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1034                 struct p_header100 *h = header;
1035                 if (h->pad != 0) {
1036                         conn_err(tconn, "Header padding is not zero\n");
1037                         return -EINVAL;
1038                 }
1039                 pi->vnr = be16_to_cpu(h->volume);
1040                 pi->cmd = be16_to_cpu(h->command);
1041                 pi->size = be32_to_cpu(h->length);
1042         } else if (header_size == sizeof(struct p_header95) &&
1043                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1044                 struct p_header95 *h = header;
1045                 pi->cmd = be16_to_cpu(h->command);
1046                 pi->size = be32_to_cpu(h->length);
1047                 pi->vnr = 0;
1048         } else if (header_size == sizeof(struct p_header80) &&
1049                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1050                 struct p_header80 *h = header;
1051                 pi->cmd = be16_to_cpu(h->command);
1052                 pi->size = be16_to_cpu(h->length);
1053                 pi->vnr = 0;
1054         } else {
1055                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1056                          be32_to_cpu(*(__be32 *)header),
1057                          tconn->agreed_pro_version);
1058                 return -EINVAL;
1059         }
1060         pi->data = header + header_size;
1061         return 0;
1062 }
1063
1064 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1065 {
1066         void *buffer = tconn->data.rbuf;
1067         int err;
1068
1069         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1070         if (err)
1071                 return err;
1072
1073         err = decode_header(tconn, buffer, pi);
1074         tconn->last_received = jiffies;
1075
1076         return err;
1077 }
1078
1079 static void drbd_flush(struct drbd_conf *mdev)
1080 {
1081         int rv;
1082
1083         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1084                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1085                                         NULL);
1086                 if (rv) {
1087                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
1088                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1089                          * don't try again for ANY return value != 0
1090                          * if (rv == -EOPNOTSUPP) */
1091                         drbd_bump_write_ordering(mdev, WO_drain_io);
1092                 }
1093                 put_ldev(mdev);
1094         }
1095 }
1096
1097 /**
1098  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1099  * @mdev:       DRBD device.
1100  * @epoch:      Epoch object.
1101  * @ev:         Epoch event.
1102  */
1103 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1104                                                struct drbd_epoch *epoch,
1105                                                enum epoch_event ev)
1106 {
1107         int epoch_size;
1108         struct drbd_epoch *next_epoch;
1109         enum finish_epoch rv = FE_STILL_LIVE;
1110
1111         spin_lock(&mdev->epoch_lock);
1112         do {
1113                 next_epoch = NULL;
1114
1115                 epoch_size = atomic_read(&epoch->epoch_size);
1116
1117                 switch (ev & ~EV_CLEANUP) {
1118                 case EV_PUT:
1119                         atomic_dec(&epoch->active);
1120                         break;
1121                 case EV_GOT_BARRIER_NR:
1122                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1123                         break;
1124                 case EV_BECAME_LAST:
1125                         /* nothing to do*/
1126                         break;
1127                 }
1128
1129                 if (epoch_size != 0 &&
1130                     atomic_read(&epoch->active) == 0 &&
1131                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1132                         if (!(ev & EV_CLEANUP)) {
1133                                 spin_unlock(&mdev->epoch_lock);
1134                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1135                                 spin_lock(&mdev->epoch_lock);
1136                         }
1137                         dec_unacked(mdev);
1138
1139                         if (mdev->current_epoch != epoch) {
1140                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1141                                 list_del(&epoch->list);
1142                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1143                                 mdev->epochs--;
1144                                 kfree(epoch);
1145
1146                                 if (rv == FE_STILL_LIVE)
1147                                         rv = FE_DESTROYED;
1148                         } else {
1149                                 epoch->flags = 0;
1150                                 atomic_set(&epoch->epoch_size, 0);
1151                                 /* atomic_set(&epoch->active, 0); is already zero */
1152                                 if (rv == FE_STILL_LIVE)
1153                                         rv = FE_RECYCLED;
1154                                 wake_up(&mdev->ee_wait);
1155                         }
1156                 }
1157
1158                 if (!next_epoch)
1159                         break;
1160
1161                 epoch = next_epoch;
1162         } while (1);
1163
1164         spin_unlock(&mdev->epoch_lock);
1165
1166         return rv;
1167 }
1168
1169 /**
1170  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1171  * @mdev:       DRBD device.
1172  * @wo:         Write ordering method to try.
1173  */
1174 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1175 {
1176         struct disk_conf *dc;
1177         enum write_ordering_e pwo;
1178         static char *write_ordering_str[] = {
1179                 [WO_none] = "none",
1180                 [WO_drain_io] = "drain",
1181                 [WO_bdev_flush] = "flush",
1182         };
1183
1184         pwo = mdev->write_ordering;
1185         wo = min(pwo, wo);
1186         rcu_read_lock();
1187         dc = rcu_dereference(mdev->ldev->disk_conf);
1188
1189         if (wo == WO_bdev_flush && !dc->disk_flushes)
1190                 wo = WO_drain_io;
1191         if (wo == WO_drain_io && !dc->disk_drain)
1192                 wo = WO_none;
1193         rcu_read_unlock();
1194         mdev->write_ordering = wo;
1195         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1196                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1197 }
1198
1199 /**
1200  * drbd_submit_peer_request()
1201  * @mdev:       DRBD device.
1202  * @peer_req:   peer request
1203  * @rw:         flag field, see bio->bi_rw
1204  *
1205  * May spread the pages to multiple bios,
1206  * depending on bio_add_page restrictions.
1207  *
1208  * Returns 0 if all bios have been submitted,
1209  * -ENOMEM if we could not allocate enough bios,
1210  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1211  *  single page to an empty bio (which should never happen and likely indicates
1212  *  that the lower level IO stack is in some way broken). This has been observed
1213  *  on certain Xen deployments.
1214  */
1215 /* TODO allocate from our own bio_set. */
1216 int drbd_submit_peer_request(struct drbd_conf *mdev,
1217                              struct drbd_peer_request *peer_req,
1218                              const unsigned rw, const int fault_type)
1219 {
1220         struct bio *bios = NULL;
1221         struct bio *bio;
1222         struct page *page = peer_req->pages;
1223         sector_t sector = peer_req->i.sector;
1224         unsigned ds = peer_req->i.size;
1225         unsigned n_bios = 0;
1226         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1227         int err = -ENOMEM;
1228
1229         /* In most cases, we will only need one bio.  But in case the lower
1230          * level restrictions happen to be different at this offset on this
1231          * side than those of the sending peer, we may need to submit the
1232          * request in more than one bio.
1233          *
1234          * Plain bio_alloc is good enough here, this is no DRBD internally
1235          * generated bio, but a bio allocated on behalf of the peer.
1236          */
1237 next_bio:
1238         bio = bio_alloc(GFP_NOIO, nr_pages);
1239         if (!bio) {
1240                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1241                 goto fail;
1242         }
1243         /* > peer_req->i.sector, unless this is the first bio */
1244         bio->bi_sector = sector;
1245         bio->bi_bdev = mdev->ldev->backing_bdev;
1246         bio->bi_rw = rw;
1247         bio->bi_private = peer_req;
1248         bio->bi_end_io = drbd_peer_request_endio;
1249
1250         bio->bi_next = bios;
1251         bios = bio;
1252         ++n_bios;
1253
1254         page_chain_for_each(page) {
1255                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1256                 if (!bio_add_page(bio, page, len, 0)) {
1257                         /* A single page must always be possible!
1258                          * But in case it fails anyways,
1259                          * we deal with it, and complain (below). */
1260                         if (bio->bi_vcnt == 0) {
1261                                 dev_err(DEV,
1262                                         "bio_add_page failed for len=%u, "
1263                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1264                                         len, (unsigned long long)bio->bi_sector);
1265                                 err = -ENOSPC;
1266                                 goto fail;
1267                         }
1268                         goto next_bio;
1269                 }
1270                 ds -= len;
1271                 sector += len >> 9;
1272                 --nr_pages;
1273         }
1274         D_ASSERT(page == NULL);
1275         D_ASSERT(ds == 0);
1276
1277         atomic_set(&peer_req->pending_bios, n_bios);
1278         do {
1279                 bio = bios;
1280                 bios = bios->bi_next;
1281                 bio->bi_next = NULL;
1282
1283                 drbd_generic_make_request(mdev, fault_type, bio);
1284         } while (bios);
1285         return 0;
1286
1287 fail:
1288         while (bios) {
1289                 bio = bios;
1290                 bios = bios->bi_next;
1291                 bio_put(bio);
1292         }
1293         return err;
1294 }
1295
1296 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1297                                              struct drbd_peer_request *peer_req)
1298 {
1299         struct drbd_interval *i = &peer_req->i;
1300
1301         drbd_remove_interval(&mdev->write_requests, i);
1302         drbd_clear_interval(i);
1303
1304         /* Wake up any processes waiting for this peer request to complete.  */
1305         if (i->waiting)
1306                 wake_up(&mdev->misc_wait);
1307 }
1308
1309 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1310 {
1311         struct drbd_conf *mdev;
1312         int rv;
1313         struct p_barrier *p = pi->data;
1314         struct drbd_epoch *epoch;
1315
1316         mdev = vnr_to_mdev(tconn, pi->vnr);
1317         if (!mdev)
1318                 return -EIO;
1319
1320         inc_unacked(mdev);
1321
1322         mdev->current_epoch->barrier_nr = p->barrier;
1323         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1324
1325         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1326          * the activity log, which means it would not be resynced in case the
1327          * R_PRIMARY crashes now.
1328          * Therefore we must send the barrier_ack after the barrier request was
1329          * completed. */
1330         switch (mdev->write_ordering) {
1331         case WO_none:
1332                 if (rv == FE_RECYCLED)
1333                         return 0;
1334
1335                 /* receiver context, in the writeout path of the other node.
1336                  * avoid potential distributed deadlock */
1337                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1338                 if (epoch)
1339                         break;
1340                 else
1341                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1342                         /* Fall through */
1343
1344         case WO_bdev_flush:
1345         case WO_drain_io:
1346                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1347                 drbd_flush(mdev);
1348
1349                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1350                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1351                         if (epoch)
1352                                 break;
1353                 }
1354
1355                 epoch = mdev->current_epoch;
1356                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1357
1358                 D_ASSERT(atomic_read(&epoch->active) == 0);
1359                 D_ASSERT(epoch->flags == 0);
1360
1361                 return 0;
1362         default:
1363                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1364                 return -EIO;
1365         }
1366
1367         epoch->flags = 0;
1368         atomic_set(&epoch->epoch_size, 0);
1369         atomic_set(&epoch->active, 0);
1370
1371         spin_lock(&mdev->epoch_lock);
1372         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1373                 list_add(&epoch->list, &mdev->current_epoch->list);
1374                 mdev->current_epoch = epoch;
1375                 mdev->epochs++;
1376         } else {
1377                 /* The current_epoch got recycled while we allocated this one... */
1378                 kfree(epoch);
1379         }
1380         spin_unlock(&mdev->epoch_lock);
1381
1382         return 0;
1383 }
1384
1385 /* used from receive_RSDataReply (recv_resync_read)
1386  * and from receive_Data */
1387 static struct drbd_peer_request *
1388 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1389               int data_size) __must_hold(local)
1390 {
1391         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1392         struct drbd_peer_request *peer_req;
1393         struct page *page;
1394         int dgs, ds, err;
1395         void *dig_in = mdev->tconn->int_dig_in;
1396         void *dig_vv = mdev->tconn->int_dig_vv;
1397         unsigned long *data;
1398
1399         dgs = 0;
1400         if (mdev->tconn->peer_integrity_tfm) {
1401                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1402                 /*
1403                  * FIXME: Receive the incoming digest into the receive buffer
1404                  *        here, together with its struct p_data?
1405                  */
1406                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1407                 if (err)
1408                         return NULL;
1409                 data_size -= dgs;
1410         }
1411
1412         if (!expect(data_size != 0))
1413                 return NULL;
1414         if (!expect(IS_ALIGNED(data_size, 512)))
1415                 return NULL;
1416         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1417                 return NULL;
1418
1419         /* even though we trust out peer,
1420          * we sometimes have to double check. */
1421         if (sector + (data_size>>9) > capacity) {
1422                 dev_err(DEV, "request from peer beyond end of local disk: "
1423                         "capacity: %llus < sector: %llus + size: %u\n",
1424                         (unsigned long long)capacity,
1425                         (unsigned long long)sector, data_size);
1426                 return NULL;
1427         }
1428
1429         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1430          * "criss-cross" setup, that might cause write-out on some other DRBD,
1431          * which in turn might block on the other node at this very place.  */
1432         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1433         if (!peer_req)
1434                 return NULL;
1435
1436         ds = data_size;
1437         page = peer_req->pages;
1438         page_chain_for_each(page) {
1439                 unsigned len = min_t(int, ds, PAGE_SIZE);
1440                 data = kmap(page);
1441                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1442                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1443                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1444                         data[0] = data[0] ^ (unsigned long)-1;
1445                 }
1446                 kunmap(page);
1447                 if (err) {
1448                         drbd_free_peer_req(mdev, peer_req);
1449                         return NULL;
1450                 }
1451                 ds -= len;
1452         }
1453
1454         if (dgs) {
1455                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1456                 if (memcmp(dig_in, dig_vv, dgs)) {
1457                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1458                                 (unsigned long long)sector, data_size);
1459                         drbd_free_peer_req(mdev, peer_req);
1460                         return NULL;
1461                 }
1462         }
1463         mdev->recv_cnt += data_size>>9;
1464         return peer_req;
1465 }
1466
1467 /* drbd_drain_block() just takes a data block
1468  * out of the socket input buffer, and discards it.
1469  */
1470 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1471 {
1472         struct page *page;
1473         int err = 0;
1474         void *data;
1475
1476         if (!data_size)
1477                 return 0;
1478
1479         page = drbd_alloc_pages(mdev, 1, 1);
1480
1481         data = kmap(page);
1482         while (data_size) {
1483                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1484
1485                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1486                 if (err)
1487                         break;
1488                 data_size -= len;
1489         }
1490         kunmap(page);
1491         drbd_free_pages(mdev, page, 0);
1492         return err;
1493 }
1494
1495 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1496                            sector_t sector, int data_size)
1497 {
1498         struct bio_vec *bvec;
1499         struct bio *bio;
1500         int dgs, err, i, expect;
1501         void *dig_in = mdev->tconn->int_dig_in;
1502         void *dig_vv = mdev->tconn->int_dig_vv;
1503
1504         dgs = 0;
1505         if (mdev->tconn->peer_integrity_tfm) {
1506                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1507                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1508                 if (err)
1509                         return err;
1510                 data_size -= dgs;
1511         }
1512
1513         /* optimistically update recv_cnt.  if receiving fails below,
1514          * we disconnect anyways, and counters will be reset. */
1515         mdev->recv_cnt += data_size>>9;
1516
1517         bio = req->master_bio;
1518         D_ASSERT(sector == bio->bi_sector);
1519
1520         bio_for_each_segment(bvec, bio, i) {
1521                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1522                 expect = min_t(int, data_size, bvec->bv_len);
1523                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1524                 kunmap(bvec->bv_page);
1525                 if (err)
1526                         return err;
1527                 data_size -= expect;
1528         }
1529
1530         if (dgs) {
1531                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1532                 if (memcmp(dig_in, dig_vv, dgs)) {
1533                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1534                         return -EINVAL;
1535                 }
1536         }
1537
1538         D_ASSERT(data_size == 0);
1539         return 0;
1540 }
1541
1542 /*
1543  * e_end_resync_block() is called in asender context via
1544  * drbd_finish_peer_reqs().
1545  */
1546 static int e_end_resync_block(struct drbd_work *w, int unused)
1547 {
1548         struct drbd_peer_request *peer_req =
1549                 container_of(w, struct drbd_peer_request, w);
1550         struct drbd_conf *mdev = w->mdev;
1551         sector_t sector = peer_req->i.sector;
1552         int err;
1553
1554         D_ASSERT(drbd_interval_empty(&peer_req->i));
1555
1556         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1557                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1558                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1559         } else {
1560                 /* Record failure to sync */
1561                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1562
1563                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1564         }
1565         dec_unacked(mdev);
1566
1567         return err;
1568 }
1569
1570 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1571 {
1572         struct drbd_peer_request *peer_req;
1573
1574         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1575         if (!peer_req)
1576                 goto fail;
1577
1578         dec_rs_pending(mdev);
1579
1580         inc_unacked(mdev);
1581         /* corresponding dec_unacked() in e_end_resync_block()
1582          * respective _drbd_clear_done_ee */
1583
1584         peer_req->w.cb = e_end_resync_block;
1585
1586         spin_lock_irq(&mdev->tconn->req_lock);
1587         list_add(&peer_req->w.list, &mdev->sync_ee);
1588         spin_unlock_irq(&mdev->tconn->req_lock);
1589
1590         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1591         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1592                 return 0;
1593
1594         /* don't care for the reason here */
1595         dev_err(DEV, "submit failed, triggering re-connect\n");
1596         spin_lock_irq(&mdev->tconn->req_lock);
1597         list_del(&peer_req->w.list);
1598         spin_unlock_irq(&mdev->tconn->req_lock);
1599
1600         drbd_free_peer_req(mdev, peer_req);
1601 fail:
1602         put_ldev(mdev);
1603         return -EIO;
1604 }
1605
1606 static struct drbd_request *
1607 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1608              sector_t sector, bool missing_ok, const char *func)
1609 {
1610         struct drbd_request *req;
1611
1612         /* Request object according to our peer */
1613         req = (struct drbd_request *)(unsigned long)id;
1614         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1615                 return req;
1616         if (!missing_ok) {
1617                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1618                         (unsigned long)id, (unsigned long long)sector);
1619         }
1620         return NULL;
1621 }
1622
1623 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1624 {
1625         struct drbd_conf *mdev;
1626         struct drbd_request *req;
1627         sector_t sector;
1628         int err;
1629         struct p_data *p = pi->data;
1630
1631         mdev = vnr_to_mdev(tconn, pi->vnr);
1632         if (!mdev)
1633                 return -EIO;
1634
1635         sector = be64_to_cpu(p->sector);
1636
1637         spin_lock_irq(&mdev->tconn->req_lock);
1638         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1639         spin_unlock_irq(&mdev->tconn->req_lock);
1640         if (unlikely(!req))
1641                 return -EIO;
1642
1643         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1644          * special casing it there for the various failure cases.
1645          * still no race with drbd_fail_pending_reads */
1646         err = recv_dless_read(mdev, req, sector, pi->size);
1647         if (!err)
1648                 req_mod(req, DATA_RECEIVED);
1649         /* else: nothing. handled from drbd_disconnect...
1650          * I don't think we may complete this just yet
1651          * in case we are "on-disconnect: freeze" */
1652
1653         return err;
1654 }
1655
1656 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1657 {
1658         struct drbd_conf *mdev;
1659         sector_t sector;
1660         int err;
1661         struct p_data *p = pi->data;
1662
1663         mdev = vnr_to_mdev(tconn, pi->vnr);
1664         if (!mdev)
1665                 return -EIO;
1666
1667         sector = be64_to_cpu(p->sector);
1668         D_ASSERT(p->block_id == ID_SYNCER);
1669
1670         if (get_ldev(mdev)) {
1671                 /* data is submitted to disk within recv_resync_read.
1672                  * corresponding put_ldev done below on error,
1673                  * or in drbd_peer_request_endio. */
1674                 err = recv_resync_read(mdev, sector, pi->size);
1675         } else {
1676                 if (__ratelimit(&drbd_ratelimit_state))
1677                         dev_err(DEV, "Can not write resync data to local disk.\n");
1678
1679                 err = drbd_drain_block(mdev, pi->size);
1680
1681                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1682         }
1683
1684         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1685
1686         return err;
1687 }
1688
1689 static int w_restart_write(struct drbd_work *w, int cancel)
1690 {
1691         struct drbd_request *req = container_of(w, struct drbd_request, w);
1692         struct drbd_conf *mdev = w->mdev;
1693         struct bio *bio;
1694         unsigned long start_time;
1695         unsigned long flags;
1696
1697         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1698         if (!expect(req->rq_state & RQ_POSTPONED)) {
1699                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1700                 return -EIO;
1701         }
1702         bio = req->master_bio;
1703         start_time = req->start_time;
1704         /* Postponed requests will not have their master_bio completed!  */
1705         __req_mod(req, DISCARD_WRITE, NULL);
1706         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1707
1708         while (__drbd_make_request(mdev, bio, start_time))
1709                 /* retry */ ;
1710         return 0;
1711 }
1712
1713 static void restart_conflicting_writes(struct drbd_conf *mdev,
1714                                        sector_t sector, int size)
1715 {
1716         struct drbd_interval *i;
1717         struct drbd_request *req;
1718
1719         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1720                 if (!i->local)
1721                         continue;
1722                 req = container_of(i, struct drbd_request, i);
1723                 if (req->rq_state & RQ_LOCAL_PENDING ||
1724                     !(req->rq_state & RQ_POSTPONED))
1725                         continue;
1726                 if (expect(list_empty(&req->w.list))) {
1727                         req->w.mdev = mdev;
1728                         req->w.cb = w_restart_write;
1729                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1730                 }
1731         }
1732 }
1733
1734 /*
1735  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1736  */
1737 static int e_end_block(struct drbd_work *w, int cancel)
1738 {
1739         struct drbd_peer_request *peer_req =
1740                 container_of(w, struct drbd_peer_request, w);
1741         struct drbd_conf *mdev = w->mdev;
1742         sector_t sector = peer_req->i.sector;
1743         int err = 0, pcmd;
1744
1745         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1746                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1747                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1748                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1749                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1750                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1751                         err = drbd_send_ack(mdev, pcmd, peer_req);
1752                         if (pcmd == P_RS_WRITE_ACK)
1753                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1754                 } else {
1755                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1756                         /* we expect it to be marked out of sync anyways...
1757                          * maybe assert this?  */
1758                 }
1759                 dec_unacked(mdev);
1760         }
1761         /* we delete from the conflict detection hash _after_ we sent out the
1762          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1763         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1764                 spin_lock_irq(&mdev->tconn->req_lock);
1765                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1766                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1767                 if (peer_req->flags & EE_RESTART_REQUESTS)
1768                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1769                 spin_unlock_irq(&mdev->tconn->req_lock);
1770         } else
1771                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1772
1773         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1774
1775         return err;
1776 }
1777
1778 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1779 {
1780         struct drbd_conf *mdev = w->mdev;
1781         struct drbd_peer_request *peer_req =
1782                 container_of(w, struct drbd_peer_request, w);
1783         int err;
1784
1785         err = drbd_send_ack(mdev, ack, peer_req);
1786         dec_unacked(mdev);
1787
1788         return err;
1789 }
1790
1791 static int e_send_discard_write(struct drbd_work *w, int unused)
1792 {
1793         return e_send_ack(w, P_DISCARD_WRITE);
1794 }
1795
1796 static int e_send_retry_write(struct drbd_work *w, int unused)
1797 {
1798         struct drbd_tconn *tconn = w->mdev->tconn;
1799
1800         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1801                              P_RETRY_WRITE : P_DISCARD_WRITE);
1802 }
1803
1804 static bool seq_greater(u32 a, u32 b)
1805 {
1806         /*
1807          * We assume 32-bit wrap-around here.
1808          * For 24-bit wrap-around, we would have to shift:
1809          *  a <<= 8; b <<= 8;
1810          */
1811         return (s32)a - (s32)b > 0;
1812 }
1813
1814 static u32 seq_max(u32 a, u32 b)
1815 {
1816         return seq_greater(a, b) ? a : b;
1817 }
1818
1819 static bool need_peer_seq(struct drbd_conf *mdev)
1820 {
1821         struct drbd_tconn *tconn = mdev->tconn;
1822         int tp;
1823
1824         /*
1825          * We only need to keep track of the last packet_seq number of our peer
1826          * if we are in dual-primary mode and we have the discard flag set; see
1827          * handle_write_conflicts().
1828          */
1829
1830         rcu_read_lock();
1831         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1832         rcu_read_unlock();
1833
1834         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1835 }
1836
1837 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1838 {
1839         unsigned int newest_peer_seq;
1840
1841         if (need_peer_seq(mdev)) {
1842                 spin_lock(&mdev->peer_seq_lock);
1843                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1844                 mdev->peer_seq = newest_peer_seq;
1845                 spin_unlock(&mdev->peer_seq_lock);
1846                 /* wake up only if we actually changed mdev->peer_seq */
1847                 if (peer_seq == newest_peer_seq)
1848                         wake_up(&mdev->seq_wait);
1849         }
1850 }
1851
1852 /* Called from receive_Data.
1853  * Synchronize packets on sock with packets on msock.
1854  *
1855  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1856  * packet traveling on msock, they are still processed in the order they have
1857  * been sent.
1858  *
1859  * Note: we don't care for Ack packets overtaking P_DATA packets.
1860  *
1861  * In case packet_seq is larger than mdev->peer_seq number, there are
1862  * outstanding packets on the msock. We wait for them to arrive.
1863  * In case we are the logically next packet, we update mdev->peer_seq
1864  * ourselves. Correctly handles 32bit wrap around.
1865  *
1866  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1867  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1868  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1869  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1870  *
1871  * returns 0 if we may process the packet,
1872  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1873 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1874 {
1875         DEFINE_WAIT(wait);
1876         long timeout;
1877         int ret;
1878
1879         if (!need_peer_seq(mdev))
1880                 return 0;
1881
1882         spin_lock(&mdev->peer_seq_lock);
1883         for (;;) {
1884                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1885                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1886                         ret = 0;
1887                         break;
1888                 }
1889                 if (signal_pending(current)) {
1890                         ret = -ERESTARTSYS;
1891                         break;
1892                 }
1893                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1894                 spin_unlock(&mdev->peer_seq_lock);
1895                 rcu_read_lock();
1896                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1897                 rcu_read_unlock();
1898                 timeout = schedule_timeout(timeout);
1899                 spin_lock(&mdev->peer_seq_lock);
1900                 if (!timeout) {
1901                         ret = -ETIMEDOUT;
1902                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1903                         break;
1904                 }
1905         }
1906         spin_unlock(&mdev->peer_seq_lock);
1907         finish_wait(&mdev->seq_wait, &wait);
1908         return ret;
1909 }
1910
1911 /* see also bio_flags_to_wire()
1912  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1913  * flags and back. We may replicate to other kernel versions. */
1914 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1915 {
1916         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1917                 (dpf & DP_FUA ? REQ_FUA : 0) |
1918                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1919                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1920 }
1921
1922 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1923                                     unsigned int size)
1924 {
1925         struct drbd_interval *i;
1926
1927     repeat:
1928         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1929                 struct drbd_request *req;
1930                 struct bio_and_error m;
1931
1932                 if (!i->local)
1933                         continue;
1934                 req = container_of(i, struct drbd_request, i);
1935                 if (!(req->rq_state & RQ_POSTPONED))
1936                         continue;
1937                 req->rq_state &= ~RQ_POSTPONED;
1938                 __req_mod(req, NEG_ACKED, &m);
1939                 spin_unlock_irq(&mdev->tconn->req_lock);
1940                 if (m.bio)
1941                         complete_master_bio(mdev, &m);
1942                 spin_lock_irq(&mdev->tconn->req_lock);
1943                 goto repeat;
1944         }
1945 }
1946
1947 static int handle_write_conflicts(struct drbd_conf *mdev,
1948                                   struct drbd_peer_request *peer_req)
1949 {
1950         struct drbd_tconn *tconn = mdev->tconn;
1951         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1952         sector_t sector = peer_req->i.sector;
1953         const unsigned int size = peer_req->i.size;
1954         struct drbd_interval *i;
1955         bool equal;
1956         int err;
1957
1958         /*
1959          * Inserting the peer request into the write_requests tree will prevent
1960          * new conflicting local requests from being added.
1961          */
1962         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1963
1964     repeat:
1965         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1966                 if (i == &peer_req->i)
1967                         continue;
1968
1969                 if (!i->local) {
1970                         /*
1971                          * Our peer has sent a conflicting remote request; this
1972                          * should not happen in a two-node setup.  Wait for the
1973                          * earlier peer request to complete.
1974                          */
1975                         err = drbd_wait_misc(mdev, i);
1976                         if (err)
1977                                 goto out;
1978                         goto repeat;
1979                 }
1980
1981                 equal = i->sector == sector && i->size == size;
1982                 if (resolve_conflicts) {
1983                         /*
1984                          * If the peer request is fully contained within the
1985                          * overlapping request, it can be discarded; otherwise,
1986                          * it will be retried once all overlapping requests
1987                          * have completed.
1988                          */
1989                         bool discard = i->sector <= sector && i->sector +
1990                                        (i->size >> 9) >= sector + (size >> 9);
1991
1992                         if (!equal)
1993                                 dev_alert(DEV, "Concurrent writes detected: "
1994                                                "local=%llus +%u, remote=%llus +%u, "
1995                                                "assuming %s came first\n",
1996                                           (unsigned long long)i->sector, i->size,
1997                                           (unsigned long long)sector, size,
1998                                           discard ? "local" : "remote");
1999
2000                         inc_unacked(mdev);
2001                         peer_req->w.cb = discard ? e_send_discard_write :
2002                                                    e_send_retry_write;
2003                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2004                         wake_asender(mdev->tconn);
2005
2006                         err = -ENOENT;
2007                         goto out;
2008                 } else {
2009                         struct drbd_request *req =
2010                                 container_of(i, struct drbd_request, i);
2011
2012                         if (!equal)
2013                                 dev_alert(DEV, "Concurrent writes detected: "
2014                                                "local=%llus +%u, remote=%llus +%u\n",
2015                                           (unsigned long long)i->sector, i->size,
2016                                           (unsigned long long)sector, size);
2017
2018                         if (req->rq_state & RQ_LOCAL_PENDING ||
2019                             !(req->rq_state & RQ_POSTPONED)) {
2020                                 /*
2021                                  * Wait for the node with the discard flag to
2022                                  * decide if this request will be discarded or
2023                                  * retried.  Requests that are discarded will
2024                                  * disappear from the write_requests tree.
2025                                  *
2026                                  * In addition, wait for the conflicting
2027                                  * request to finish locally before submitting
2028                                  * the conflicting peer request.
2029                                  */
2030                                 err = drbd_wait_misc(mdev, &req->i);
2031                                 if (err) {
2032                                         _conn_request_state(mdev->tconn,
2033                                                             NS(conn, C_TIMEOUT),
2034                                                             CS_HARD);
2035                                         fail_postponed_requests(mdev, sector, size);
2036                                         goto out;
2037                                 }
2038                                 goto repeat;
2039                         }
2040                         /*
2041                          * Remember to restart the conflicting requests after
2042                          * the new peer request has completed.
2043                          */
2044                         peer_req->flags |= EE_RESTART_REQUESTS;
2045                 }
2046         }
2047         err = 0;
2048
2049     out:
2050         if (err)
2051                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2052         return err;
2053 }
2054
2055 /* mirrored write */
2056 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2057 {
2058         struct drbd_conf *mdev;
2059         sector_t sector;
2060         struct drbd_peer_request *peer_req;
2061         struct p_data *p = pi->data;
2062         u32 peer_seq = be32_to_cpu(p->seq_num);
2063         int rw = WRITE;
2064         u32 dp_flags;
2065         int err, tp;
2066
2067         mdev = vnr_to_mdev(tconn, pi->vnr);
2068         if (!mdev)
2069                 return -EIO;
2070
2071         if (!get_ldev(mdev)) {
2072                 int err2;
2073
2074                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2075                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2076                 atomic_inc(&mdev->current_epoch->epoch_size);
2077                 err2 = drbd_drain_block(mdev, pi->size);
2078                 if (!err)
2079                         err = err2;
2080                 return err;
2081         }
2082
2083         /*
2084          * Corresponding put_ldev done either below (on various errors), or in
2085          * drbd_peer_request_endio, if we successfully submit the data at the
2086          * end of this function.
2087          */
2088
2089         sector = be64_to_cpu(p->sector);
2090         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2091         if (!peer_req) {
2092                 put_ldev(mdev);
2093                 return -EIO;
2094         }
2095
2096         peer_req->w.cb = e_end_block;
2097
2098         dp_flags = be32_to_cpu(p->dp_flags);
2099         rw |= wire_flags_to_bio(mdev, dp_flags);
2100
2101         if (dp_flags & DP_MAY_SET_IN_SYNC)
2102                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2103
2104         spin_lock(&mdev->epoch_lock);
2105         peer_req->epoch = mdev->current_epoch;
2106         atomic_inc(&peer_req->epoch->epoch_size);
2107         atomic_inc(&peer_req->epoch->active);
2108         spin_unlock(&mdev->epoch_lock);
2109
2110         rcu_read_lock();
2111         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2112         rcu_read_unlock();
2113         if (tp) {
2114                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2115                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2116                 if (err)
2117                         goto out_interrupted;
2118                 spin_lock_irq(&mdev->tconn->req_lock);
2119                 err = handle_write_conflicts(mdev, peer_req);
2120                 if (err) {
2121                         spin_unlock_irq(&mdev->tconn->req_lock);
2122                         if (err == -ENOENT) {
2123                                 put_ldev(mdev);
2124                                 return 0;
2125                         }
2126                         goto out_interrupted;
2127                 }
2128         } else
2129                 spin_lock_irq(&mdev->tconn->req_lock);
2130         list_add(&peer_req->w.list, &mdev->active_ee);
2131         spin_unlock_irq(&mdev->tconn->req_lock);
2132
2133         if (mdev->tconn->agreed_pro_version < 100) {
2134                 rcu_read_lock();
2135                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2136                 case DRBD_PROT_C:
2137                         dp_flags |= DP_SEND_WRITE_ACK;
2138                         break;
2139                 case DRBD_PROT_B:
2140                         dp_flags |= DP_SEND_RECEIVE_ACK;
2141                         break;
2142                 }
2143                 rcu_read_unlock();
2144         }
2145
2146         if (dp_flags & DP_SEND_WRITE_ACK) {
2147                 peer_req->flags |= EE_SEND_WRITE_ACK;
2148                 inc_unacked(mdev);
2149                 /* corresponding dec_unacked() in e_end_block()
2150                  * respective _drbd_clear_done_ee */
2151         }
2152
2153         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2154                 /* I really don't like it that the receiver thread
2155                  * sends on the msock, but anyways */
2156                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2157         }
2158
2159         if (mdev->state.pdsk < D_INCONSISTENT) {
2160                 /* In case we have the only disk of the cluster, */
2161                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2162                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2163                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2164                 drbd_al_begin_io(mdev, &peer_req->i);
2165         }
2166
2167         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2168         if (!err)
2169                 return 0;
2170
2171         /* don't care for the reason here */
2172         dev_err(DEV, "submit failed, triggering re-connect\n");
2173         spin_lock_irq(&mdev->tconn->req_lock);
2174         list_del(&peer_req->w.list);
2175         drbd_remove_epoch_entry_interval(mdev, peer_req);
2176         spin_unlock_irq(&mdev->tconn->req_lock);
2177         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2178                 drbd_al_complete_io(mdev, &peer_req->i);
2179
2180 out_interrupted:
2181         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2182         put_ldev(mdev);
2183         drbd_free_peer_req(mdev, peer_req);
2184         return err;
2185 }
2186
2187 /* We may throttle resync, if the lower device seems to be busy,
2188  * and current sync rate is above c_min_rate.
2189  *
2190  * To decide whether or not the lower device is busy, we use a scheme similar
2191  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2192  * (more than 64 sectors) of activity we cannot account for with our own resync
2193  * activity, it obviously is "busy".
2194  *
2195  * The current sync rate used here uses only the most recent two step marks,
2196  * to have a short time average so we can react faster.
2197  */
2198 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2199 {
2200         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2201         unsigned long db, dt, dbdt;
2202         struct lc_element *tmp;
2203         int curr_events;
2204         int throttle = 0;
2205         unsigned int c_min_rate;
2206
2207         rcu_read_lock();
2208         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2209         rcu_read_unlock();
2210
2211         /* feature disabled? */
2212         if (c_min_rate == 0)
2213                 return 0;
2214
2215         spin_lock_irq(&mdev->al_lock);
2216         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2217         if (tmp) {
2218                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2219                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2220                         spin_unlock_irq(&mdev->al_lock);
2221                         return 0;
2222                 }
2223                 /* Do not slow down if app IO is already waiting for this extent */
2224         }
2225         spin_unlock_irq(&mdev->al_lock);
2226
2227         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2228                       (int)part_stat_read(&disk->part0, sectors[1]) -
2229                         atomic_read(&mdev->rs_sect_ev);
2230
2231         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2232                 unsigned long rs_left;
2233                 int i;
2234
2235                 mdev->rs_last_events = curr_events;
2236
2237                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2238                  * approx. */
2239                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2240
2241                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2242                         rs_left = mdev->ov_left;
2243                 else
2244                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2245
2246                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2247                 if (!dt)
2248                         dt++;
2249                 db = mdev->rs_mark_left[i] - rs_left;
2250                 dbdt = Bit2KB(db/dt);
2251
2252                 if (dbdt > c_min_rate)
2253                         throttle = 1;
2254         }
2255         return throttle;
2256 }
2257
2258
2259 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2260 {
2261         struct drbd_conf *mdev;
2262         sector_t sector;
2263         sector_t capacity;
2264         struct drbd_peer_request *peer_req;
2265         struct digest_info *di = NULL;
2266         int size, verb;
2267         unsigned int fault_type;
2268         struct p_block_req *p = pi->data;
2269
2270         mdev = vnr_to_mdev(tconn, pi->vnr);
2271         if (!mdev)
2272                 return -EIO;
2273         capacity = drbd_get_capacity(mdev->this_bdev);
2274
2275         sector = be64_to_cpu(p->sector);
2276         size   = be32_to_cpu(p->blksize);
2277
2278         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2279                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2280                                 (unsigned long long)sector, size);
2281                 return -EINVAL;
2282         }
2283         if (sector + (size>>9) > capacity) {
2284                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2285                                 (unsigned long long)sector, size);
2286                 return -EINVAL;
2287         }
2288
2289         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2290                 verb = 1;
2291                 switch (pi->cmd) {
2292                 case P_DATA_REQUEST:
2293                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2294                         break;
2295                 case P_RS_DATA_REQUEST:
2296                 case P_CSUM_RS_REQUEST:
2297                 case P_OV_REQUEST:
2298                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2299                         break;
2300                 case P_OV_REPLY:
2301                         verb = 0;
2302                         dec_rs_pending(mdev);
2303                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2304                         break;
2305                 default:
2306                         BUG();
2307                 }
2308                 if (verb && __ratelimit(&drbd_ratelimit_state))
2309                         dev_err(DEV, "Can not satisfy peer's read request, "
2310                             "no local data.\n");
2311
2312                 /* drain possibly payload */
2313                 return drbd_drain_block(mdev, pi->size);
2314         }
2315
2316         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2317          * "criss-cross" setup, that might cause write-out on some other DRBD,
2318          * which in turn might block on the other node at this very place.  */
2319         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2320         if (!peer_req) {
2321                 put_ldev(mdev);
2322                 return -ENOMEM;
2323         }
2324
2325         switch (pi->cmd) {
2326         case P_DATA_REQUEST:
2327                 peer_req->w.cb = w_e_end_data_req;
2328                 fault_type = DRBD_FAULT_DT_RD;
2329                 /* application IO, don't drbd_rs_begin_io */
2330                 goto submit;
2331
2332         case P_RS_DATA_REQUEST:
2333                 peer_req->w.cb = w_e_end_rsdata_req;
2334                 fault_type = DRBD_FAULT_RS_RD;
2335                 /* used in the sector offset progress display */
2336                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2337                 break;
2338
2339         case P_OV_REPLY:
2340         case P_CSUM_RS_REQUEST:
2341                 fault_type = DRBD_FAULT_RS_RD;
2342                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2343                 if (!di)
2344                         goto out_free_e;
2345
2346                 di->digest_size = pi->size;
2347                 di->digest = (((char *)di)+sizeof(struct digest_info));
2348
2349                 peer_req->digest = di;
2350                 peer_req->flags |= EE_HAS_DIGEST;
2351
2352                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2353                         goto out_free_e;
2354
2355                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2356                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2357                         peer_req->w.cb = w_e_end_csum_rs_req;
2358                         /* used in the sector offset progress display */
2359                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2360                 } else if (pi->cmd == P_OV_REPLY) {
2361                         /* track progress, we may need to throttle */
2362                         atomic_add(size >> 9, &mdev->rs_sect_in);
2363                         peer_req->w.cb = w_e_end_ov_reply;
2364                         dec_rs_pending(mdev);
2365                         /* drbd_rs_begin_io done when we sent this request,
2366                          * but accounting still needs to be done. */
2367                         goto submit_for_resync;
2368                 }
2369                 break;
2370
2371         case P_OV_REQUEST:
2372                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2373                     mdev->tconn->agreed_pro_version >= 90) {
2374                         unsigned long now = jiffies;
2375                         int i;
2376                         mdev->ov_start_sector = sector;
2377                         mdev->ov_position = sector;
2378                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2379                         mdev->rs_total = mdev->ov_left;
2380                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2381                                 mdev->rs_mark_left[i] = mdev->ov_left;
2382                                 mdev->rs_mark_time[i] = now;
2383                         }
2384                         dev_info(DEV, "Online Verify start sector: %llu\n",
2385                                         (unsigned long long)sector);
2386                 }
2387                 peer_req->w.cb = w_e_end_ov_req;
2388                 fault_type = DRBD_FAULT_RS_RD;
2389                 break;
2390
2391         default:
2392                 BUG();
2393         }
2394
2395         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2396          * wrt the receiver, but it is not as straightforward as it may seem.
2397          * Various places in the resync start and stop logic assume resync
2398          * requests are processed in order, requeuing this on the worker thread
2399          * introduces a bunch of new code for synchronization between threads.
2400          *
2401          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2402          * "forever", throttling after drbd_rs_begin_io will lock that extent
2403          * for application writes for the same time.  For now, just throttle
2404          * here, where the rest of the code expects the receiver to sleep for
2405          * a while, anyways.
2406          */
2407
2408         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2409          * this defers syncer requests for some time, before letting at least
2410          * on request through.  The resync controller on the receiving side
2411          * will adapt to the incoming rate accordingly.
2412          *
2413          * We cannot throttle here if remote is Primary/SyncTarget:
2414          * we would also throttle its application reads.
2415          * In that case, throttling is done on the SyncTarget only.
2416          */
2417         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2418                 schedule_timeout_uninterruptible(HZ/10);
2419         if (drbd_rs_begin_io(mdev, sector))
2420                 goto out_free_e;
2421
2422 submit_for_resync:
2423         atomic_add(size >> 9, &mdev->rs_sect_ev);
2424
2425 submit:
2426         inc_unacked(mdev);
2427         spin_lock_irq(&mdev->tconn->req_lock);
2428         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2429         spin_unlock_irq(&mdev->tconn->req_lock);
2430
2431         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2432                 return 0;
2433
2434         /* don't care for the reason here */
2435         dev_err(DEV, "submit failed, triggering re-connect\n");
2436         spin_lock_irq(&mdev->tconn->req_lock);
2437         list_del(&peer_req->w.list);
2438         spin_unlock_irq(&mdev->tconn->req_lock);
2439         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2440
2441 out_free_e:
2442         put_ldev(mdev);
2443         drbd_free_peer_req(mdev, peer_req);
2444         return -EIO;
2445 }
2446
2447 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2448 {
2449         int self, peer, rv = -100;
2450         unsigned long ch_self, ch_peer;
2451         enum drbd_after_sb_p after_sb_0p;
2452
2453         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2454         peer = mdev->p_uuid[UI_BITMAP] & 1;
2455
2456         ch_peer = mdev->p_uuid[UI_SIZE];
2457         ch_self = mdev->comm_bm_set;
2458
2459         rcu_read_lock();
2460         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2461         rcu_read_unlock();
2462         switch (after_sb_0p) {
2463         case ASB_CONSENSUS:
2464         case ASB_DISCARD_SECONDARY:
2465         case ASB_CALL_HELPER:
2466         case ASB_VIOLENTLY:
2467                 dev_err(DEV, "Configuration error.\n");
2468                 break;
2469         case ASB_DISCONNECT:
2470                 break;
2471         case ASB_DISCARD_YOUNGER_PRI:
2472                 if (self == 0 && peer == 1) {
2473                         rv = -1;
2474                         break;
2475                 }
2476                 if (self == 1 && peer == 0) {
2477                         rv =  1;
2478                         break;
2479                 }
2480                 /* Else fall through to one of the other strategies... */
2481         case ASB_DISCARD_OLDER_PRI:
2482                 if (self == 0 && peer == 1) {
2483                         rv = 1;
2484                         break;
2485                 }
2486                 if (self == 1 && peer == 0) {
2487                         rv = -1;
2488                         break;
2489                 }
2490                 /* Else fall through to one of the other strategies... */
2491                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2492                      "Using discard-least-changes instead\n");
2493         case ASB_DISCARD_ZERO_CHG:
2494                 if (ch_peer == 0 && ch_self == 0) {
2495                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2496                                 ? -1 : 1;
2497                         break;
2498                 } else {
2499                         if (ch_peer == 0) { rv =  1; break; }
2500                         if (ch_self == 0) { rv = -1; break; }
2501                 }
2502                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2503                         break;
2504         case ASB_DISCARD_LEAST_CHG:
2505                 if      (ch_self < ch_peer)
2506                         rv = -1;
2507                 else if (ch_self > ch_peer)
2508                         rv =  1;
2509                 else /* ( ch_self == ch_peer ) */
2510                      /* Well, then use something else. */
2511                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2512                                 ? -1 : 1;
2513                 break;
2514         case ASB_DISCARD_LOCAL:
2515                 rv = -1;
2516                 break;
2517         case ASB_DISCARD_REMOTE:
2518                 rv =  1;
2519         }
2520
2521         return rv;
2522 }
2523
2524 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2525 {
2526         int hg, rv = -100;
2527         enum drbd_after_sb_p after_sb_1p;
2528
2529         rcu_read_lock();
2530         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2531         rcu_read_unlock();
2532         switch (after_sb_1p) {
2533         case ASB_DISCARD_YOUNGER_PRI:
2534         case ASB_DISCARD_OLDER_PRI:
2535         case ASB_DISCARD_LEAST_CHG:
2536         case ASB_DISCARD_LOCAL:
2537         case ASB_DISCARD_REMOTE:
2538         case ASB_DISCARD_ZERO_CHG:
2539                 dev_err(DEV, "Configuration error.\n");
2540                 break;
2541         case ASB_DISCONNECT:
2542                 break;
2543         case ASB_CONSENSUS:
2544                 hg = drbd_asb_recover_0p(mdev);
2545                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2546                         rv = hg;
2547                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2548                         rv = hg;
2549                 break;
2550         case ASB_VIOLENTLY:
2551                 rv = drbd_asb_recover_0p(mdev);
2552                 break;
2553         case ASB_DISCARD_SECONDARY:
2554                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2555         case ASB_CALL_HELPER:
2556                 hg = drbd_asb_recover_0p(mdev);
2557                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2558                         enum drbd_state_rv rv2;
2559
2560                         drbd_set_role(mdev, R_SECONDARY, 0);
2561                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2562                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2563                           * we do not need to wait for the after state change work either. */
2564                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2565                         if (rv2 != SS_SUCCESS) {
2566                                 drbd_khelper(mdev, "pri-lost-after-sb");
2567                         } else {
2568                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2569                                 rv = hg;
2570                         }
2571                 } else
2572                         rv = hg;
2573         }
2574
2575         return rv;
2576 }
2577
2578 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2579 {
2580         int hg, rv = -100;
2581         enum drbd_after_sb_p after_sb_2p;
2582
2583         rcu_read_lock();
2584         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2585         rcu_read_unlock();
2586         switch (after_sb_2p) {
2587         case ASB_DISCARD_YOUNGER_PRI:
2588         case ASB_DISCARD_OLDER_PRI:
2589         case ASB_DISCARD_LEAST_CHG:
2590         case ASB_DISCARD_LOCAL:
2591         case ASB_DISCARD_REMOTE:
2592         case ASB_CONSENSUS:
2593         case ASB_DISCARD_SECONDARY:
2594         case ASB_DISCARD_ZERO_CHG:
2595                 dev_err(DEV, "Configuration error.\n");
2596                 break;
2597         case ASB_VIOLENTLY:
2598                 rv = drbd_asb_recover_0p(mdev);
2599                 break;
2600         case ASB_DISCONNECT:
2601                 break;
2602         case ASB_CALL_HELPER:
2603                 hg = drbd_asb_recover_0p(mdev);
2604                 if (hg == -1) {
2605                         enum drbd_state_rv rv2;
2606
2607                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2608                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2609                           * we do not need to wait for the after state change work either. */
2610                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2611                         if (rv2 != SS_SUCCESS) {
2612                                 drbd_khelper(mdev, "pri-lost-after-sb");
2613                         } else {
2614                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2615                                 rv = hg;
2616                         }
2617                 } else
2618                         rv = hg;
2619         }
2620
2621         return rv;
2622 }
2623
2624 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2625                            u64 bits, u64 flags)
2626 {
2627         if (!uuid) {
2628                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2629                 return;
2630         }
2631         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2632              text,
2633              (unsigned long long)uuid[UI_CURRENT],
2634              (unsigned long long)uuid[UI_BITMAP],
2635              (unsigned long long)uuid[UI_HISTORY_START],
2636              (unsigned long long)uuid[UI_HISTORY_END],
2637              (unsigned long long)bits,
2638              (unsigned long long)flags);
2639 }
2640
2641 /*
2642   100   after split brain try auto recover
2643     2   C_SYNC_SOURCE set BitMap
2644     1   C_SYNC_SOURCE use BitMap
2645     0   no Sync
2646    -1   C_SYNC_TARGET use BitMap
2647    -2   C_SYNC_TARGET set BitMap
2648  -100   after split brain, disconnect
2649 -1000   unrelated data
2650 -1091   requires proto 91
2651 -1096   requires proto 96
2652  */
2653 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2654 {
2655         u64 self, peer;
2656         int i, j;
2657
2658         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2659         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2660
2661         *rule_nr = 10;
2662         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2663                 return 0;
2664
2665         *rule_nr = 20;
2666         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2667              peer != UUID_JUST_CREATED)
2668                 return -2;
2669
2670         *rule_nr = 30;
2671         if (self != UUID_JUST_CREATED &&
2672             (peer == UUID_JUST_CREATED || peer == (u64)0))
2673                 return 2;
2674
2675         if (self == peer) {
2676                 int rct, dc; /* roles at crash time */
2677
2678                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2679
2680                         if (mdev->tconn->agreed_pro_version < 91)
2681                                 return -1091;
2682
2683                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2684                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2685                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2686                                 drbd_uuid_set_bm(mdev, 0UL);
2687
2688                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2689                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2690                                 *rule_nr = 34;
2691                         } else {
2692                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2693                                 *rule_nr = 36;
2694                         }
2695
2696                         return 1;
2697                 }
2698
2699                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2700
2701                         if (mdev->tconn->agreed_pro_version < 91)
2702                                 return -1091;
2703
2704                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2705                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2706                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2707
2708                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2709                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2710                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2711
2712                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2713                                 *rule_nr = 35;
2714                         } else {
2715                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2716                                 *rule_nr = 37;
2717                         }
2718
2719                         return -1;
2720                 }
2721
2722                 /* Common power [off|failure] */
2723                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2724                         (mdev->p_uuid[UI_FLAGS] & 2);
2725                 /* lowest bit is set when we were primary,
2726                  * next bit (weight 2) is set when peer was primary */
2727                 *rule_nr = 40;
2728
2729                 switch (rct) {
2730                 case 0: /* !self_pri && !peer_pri */ return 0;
2731                 case 1: /*  self_pri && !peer_pri */ return 1;
2732                 case 2: /* !self_pri &&  peer_pri */ return -1;
2733                 case 3: /*  self_pri &&  peer_pri */
2734                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2735                         return dc ? -1 : 1;
2736                 }
2737         }
2738
2739         *rule_nr = 50;
2740         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2741         if (self == peer)
2742                 return -1;
2743
2744         *rule_nr = 51;
2745         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2746         if (self == peer) {
2747                 if (mdev->tconn->agreed_pro_version < 96 ?
2748                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2749                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2750                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2751                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2752                            resync as sync source modifications of the peer's UUIDs. */
2753
2754                         if (mdev->tconn->agreed_pro_version < 91)
2755                                 return -1091;
2756
2757                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2758                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2759
2760                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2761                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2762
2763                         return -1;
2764                 }
2765         }
2766
2767         *rule_nr = 60;
2768         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2769         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2770                 peer = mdev->p_uuid[i] & ~((u64)1);
2771                 if (self == peer)
2772                         return -2;
2773         }
2774
2775         *rule_nr = 70;
2776         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2777         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2778         if (self == peer)
2779                 return 1;
2780
2781         *rule_nr = 71;
2782         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2783         if (self == peer) {
2784                 if (mdev->tconn->agreed_pro_version < 96 ?
2785                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2786                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2787                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2788                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2789                            resync as sync source modifications of our UUIDs. */
2790
2791                         if (mdev->tconn->agreed_pro_version < 91)
2792                                 return -1091;
2793
2794                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2795                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2796
2797                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2798                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2799                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2800
2801                         return 1;
2802                 }
2803         }
2804
2805
2806         *rule_nr = 80;
2807         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2808         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2809                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2810                 if (self == peer)
2811                         return 2;
2812         }
2813
2814         *rule_nr = 90;
2815         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2816         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2817         if (self == peer && self != ((u64)0))
2818                 return 100;
2819
2820         *rule_nr = 100;
2821         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2822                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2823                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2824                         peer = mdev->p_uuid[j] & ~((u64)1);
2825                         if (self == peer)
2826                                 return -100;
2827                 }
2828         }
2829
2830         return -1000;
2831 }
2832
2833 /* drbd_sync_handshake() returns the new conn state on success, or
2834    CONN_MASK (-1) on failure.
2835  */
2836 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2837                                            enum drbd_disk_state peer_disk) __must_hold(local)
2838 {
2839         enum drbd_conns rv = C_MASK;
2840         enum drbd_disk_state mydisk;
2841         struct net_conf *nc;
2842         int hg, rule_nr, rr_conflict, dry_run;
2843
2844         mydisk = mdev->state.disk;
2845         if (mydisk == D_NEGOTIATING)
2846                 mydisk = mdev->new_state_tmp.disk;
2847
2848         dev_info(DEV, "drbd_sync_handshake:\n");
2849         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2850         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2851                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2852
2853         hg = drbd_uuid_compare(mdev, &rule_nr);
2854
2855         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2856
2857         if (hg == -1000) {
2858                 dev_alert(DEV, "Unrelated data, aborting!\n");
2859                 return C_MASK;
2860         }
2861         if (hg < -1000) {
2862                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2863                 return C_MASK;
2864         }
2865
2866         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2867             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2868                 int f = (hg == -100) || abs(hg) == 2;
2869                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2870                 if (f)
2871                         hg = hg*2;
2872                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2873                      hg > 0 ? "source" : "target");
2874         }
2875
2876         if (abs(hg) == 100)
2877                 drbd_khelper(mdev, "initial-split-brain");
2878
2879         rcu_read_lock();
2880         nc = rcu_dereference(mdev->tconn->net_conf);
2881
2882         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2883                 int pcount = (mdev->state.role == R_PRIMARY)
2884                            + (peer_role == R_PRIMARY);
2885                 int forced = (hg == -100);
2886
2887                 switch (pcount) {
2888                 case 0:
2889                         hg = drbd_asb_recover_0p(mdev);
2890                         break;
2891                 case 1:
2892                         hg = drbd_asb_recover_1p(mdev);
2893                         break;
2894                 case 2:
2895                         hg = drbd_asb_recover_2p(mdev);
2896                         break;
2897                 }
2898                 if (abs(hg) < 100) {
2899                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2900                              "automatically solved. Sync from %s node\n",
2901                              pcount, (hg < 0) ? "peer" : "this");
2902                         if (forced) {
2903                                 dev_warn(DEV, "Doing a full sync, since"
2904                                      " UUIDs where ambiguous.\n");
2905                                 hg = hg*2;
2906                         }
2907                 }
2908         }
2909
2910         if (hg == -100) {
2911                 if (nc->discard_my_data && !(mdev->p_uuid[UI_FLAGS]&1))
2912                         hg = -1;
2913                 if (!nc->discard_my_data && (mdev->p_uuid[UI_FLAGS]&1))
2914                         hg = 1;
2915
2916                 if (abs(hg) < 100)
2917                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2918                              "Sync from %s node\n",
2919                              (hg < 0) ? "peer" : "this");
2920         }
2921         rr_conflict = nc->rr_conflict;
2922         dry_run = nc->dry_run;
2923         rcu_read_unlock();
2924
2925         if (hg == -100) {
2926                 /* FIXME this log message is not correct if we end up here
2927                  * after an attempted attach on a diskless node.
2928                  * We just refuse to attach -- well, we drop the "connection"
2929                  * to that disk, in a way... */
2930                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2931                 drbd_khelper(mdev, "split-brain");
2932                 return C_MASK;
2933         }
2934
2935         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2936                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2937                 return C_MASK;
2938         }
2939
2940         if (hg < 0 && /* by intention we do not use mydisk here. */
2941             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2942                 switch (rr_conflict) {
2943                 case ASB_CALL_HELPER:
2944                         drbd_khelper(mdev, "pri-lost");
2945                         /* fall through */
2946                 case ASB_DISCONNECT:
2947                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2948                         return C_MASK;
2949                 case ASB_VIOLENTLY:
2950                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2951                              "assumption\n");
2952                 }
2953         }
2954
2955         if (dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2956                 if (hg == 0)
2957                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2958                 else
2959                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2960                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2961                                  abs(hg) >= 2 ? "full" : "bit-map based");
2962                 return C_MASK;
2963         }
2964
2965         if (abs(hg) >= 2) {
2966                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2967                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2968                                         BM_LOCKED_SET_ALLOWED))
2969                         return C_MASK;
2970         }
2971
2972         if (hg > 0) { /* become sync source. */
2973                 rv = C_WF_BITMAP_S;
2974         } else if (hg < 0) { /* become sync target */
2975                 rv = C_WF_BITMAP_T;
2976         } else {
2977                 rv = C_CONNECTED;
2978                 if (drbd_bm_total_weight(mdev)) {
2979                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2980                              drbd_bm_total_weight(mdev));
2981                 }
2982         }
2983
2984         return rv;
2985 }
2986
2987 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
2988 {
2989         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2990         if (peer == ASB_DISCARD_REMOTE)
2991                 return ASB_DISCARD_LOCAL;
2992
2993         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2994         if (peer == ASB_DISCARD_LOCAL)
2995                 return ASB_DISCARD_REMOTE;
2996
2997         /* everything else is valid if they are equal on both sides. */
2998         return peer;
2999 }
3000
3001 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3002 {
3003         struct p_protocol *p = pi->data;
3004         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3005         int p_discard_my_data, p_two_primaries, cf;
3006         struct net_conf *nc;
3007
3008         p_proto         = be32_to_cpu(p->protocol);
3009         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3010         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3011         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3012         p_two_primaries = be32_to_cpu(p->two_primaries);
3013         cf              = be32_to_cpu(p->conn_flags);
3014         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3015
3016         if (tconn->agreed_pro_version >= 87) {
3017                 char integrity_alg[SHARED_SECRET_MAX];
3018                 struct crypto_hash *tfm = NULL;
3019                 int err;
3020
3021                 if (pi->size > sizeof(integrity_alg))
3022                         return -EIO;
3023                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3024                 if (err)
3025                         return err;
3026                 integrity_alg[SHARED_SECRET_MAX-1] = 0;
3027
3028                 if (integrity_alg[0]) {
3029                         tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3030                         if (!tfm) {
3031                                 conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3032                                          integrity_alg);
3033                                 goto disconnect;
3034                         }
3035                         conn_info(tconn, "peer data-integrity-alg: %s\n", integrity_alg);
3036                 }
3037
3038                 if (tconn->peer_integrity_tfm)
3039                         crypto_free_hash(tconn->peer_integrity_tfm);
3040                 tconn->peer_integrity_tfm = tfm;
3041         }
3042
3043         clear_bit(CONN_DRY_RUN, &tconn->flags);
3044
3045         if (cf & CF_DRY_RUN)
3046                 set_bit(CONN_DRY_RUN, &tconn->flags);
3047
3048         rcu_read_lock();
3049         nc = rcu_dereference(tconn->net_conf);
3050
3051         if (p_proto != nc->wire_protocol && tconn->agreed_pro_version < 100) {
3052                 conn_err(tconn, "incompatible communication protocols\n");
3053                 goto disconnect_rcu_unlock;
3054         }
3055
3056         if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3057                 conn_err(tconn, "incompatible after-sb-0pri settings\n");
3058                 goto disconnect_rcu_unlock;
3059         }
3060
3061         if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3062                 conn_err(tconn, "incompatible after-sb-1pri settings\n");
3063                 goto disconnect_rcu_unlock;
3064         }
3065
3066         if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3067                 conn_err(tconn, "incompatible after-sb-2pri settings\n");
3068                 goto disconnect_rcu_unlock;
3069         }
3070
3071         if (p_discard_my_data && nc->discard_my_data) {
3072                 conn_err(tconn, "both sides have the 'discard_my_data' flag set\n");
3073                 goto disconnect_rcu_unlock;
3074         }
3075
3076         if (p_two_primaries != nc->two_primaries) {
3077                 conn_err(tconn, "incompatible setting of the two-primaries options\n");
3078                 goto disconnect_rcu_unlock;
3079         }
3080
3081         rcu_read_unlock();
3082
3083         return 0;
3084
3085 disconnect_rcu_unlock:
3086         rcu_read_unlock();
3087 disconnect:
3088         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3089         return -EIO;
3090 }
3091
3092 /* helper function
3093  * input: alg name, feature name
3094  * return: NULL (alg name was "")
3095  *         ERR_PTR(error) if something goes wrong
3096  *         or the crypto hash ptr, if it worked out ok. */
3097 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3098                 const char *alg, const char *name)
3099 {
3100         struct crypto_hash *tfm;
3101
3102         if (!alg[0])
3103                 return NULL;
3104
3105         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3106         if (IS_ERR(tfm)) {
3107                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3108                         alg, name, PTR_ERR(tfm));
3109                 return tfm;
3110         }
3111         return tfm;
3112 }
3113
3114 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3115 {
3116         void *buffer = tconn->data.rbuf;
3117         int size = pi->size;
3118
3119         while (size) {
3120                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3121                 s = drbd_recv(tconn, buffer, s);
3122                 if (s <= 0) {
3123                         if (s < 0)
3124                                 return s;
3125                         break;
3126                 }
3127                 size -= s;
3128         }
3129         if (size)
3130                 return -EIO;
3131         return 0;
3132 }
3133
3134 /*
3135  * config_unknown_volume  -  device configuration command for unknown volume
3136  *
3137  * When a device is added to an existing connection, the node on which the
3138  * device is added first will send configuration commands to its peer but the
3139  * peer will not know about the device yet.  It will warn and ignore these
3140  * commands.  Once the device is added on the second node, the second node will
3141  * send the same device configuration commands, but in the other direction.
3142  *
3143  * (We can also end up here if drbd is misconfigured.)
3144  */
3145 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3146 {
3147         conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3148                   pi->vnr, cmdname(pi->cmd));
3149         return ignore_remaining_packet(tconn, pi);
3150 }
3151
3152 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3153 {
3154         struct drbd_conf *mdev;
3155         struct p_rs_param_95 *p;
3156         unsigned int header_size, data_size, exp_max_sz;
3157         struct crypto_hash *verify_tfm = NULL;
3158         struct crypto_hash *csums_tfm = NULL;
3159         struct net_conf *old_net_conf, *new_net_conf = NULL;
3160         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3161         const int apv = tconn->agreed_pro_version;
3162         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3163         int fifo_size = 0;
3164         int err;
3165
3166         mdev = vnr_to_mdev(tconn, pi->vnr);
3167         if (!mdev)
3168                 return config_unknown_volume(tconn, pi);
3169
3170         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3171                     : apv == 88 ? sizeof(struct p_rs_param)
3172                                         + SHARED_SECRET_MAX
3173                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3174                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3175
3176         if (pi->size > exp_max_sz) {
3177                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3178                     pi->size, exp_max_sz);
3179                 return -EIO;
3180         }
3181
3182         if (apv <= 88) {
3183                 header_size = sizeof(struct p_rs_param);
3184                 data_size = pi->size - header_size;
3185         } else if (apv <= 94) {
3186                 header_size = sizeof(struct p_rs_param_89);
3187                 data_size = pi->size - header_size;
3188                 D_ASSERT(data_size == 0);
3189         } else {
3190                 header_size = sizeof(struct p_rs_param_95);
3191                 data_size = pi->size - header_size;
3192                 D_ASSERT(data_size == 0);
3193         }
3194
3195         /* initialize verify_alg and csums_alg */
3196         p = pi->data;
3197         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3198
3199         err = drbd_recv_all(mdev->tconn, p, header_size);
3200         if (err)
3201                 return err;
3202
3203         mutex_lock(&mdev->tconn->conf_update);
3204         old_net_conf = mdev->tconn->net_conf;
3205         if (get_ldev(mdev)) {
3206                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3207                 if (!new_disk_conf) {
3208                         put_ldev(mdev);
3209                         mutex_unlock(&mdev->tconn->conf_update);
3210                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3211                         return -ENOMEM;
3212                 }
3213
3214                 old_disk_conf = mdev->ldev->disk_conf;
3215                 *new_disk_conf = *old_disk_conf;
3216
3217                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3218         }
3219
3220         if (apv >= 88) {
3221                 if (apv == 88) {
3222                         if (data_size > SHARED_SECRET_MAX) {
3223                                 dev_err(DEV, "verify-alg too long, "
3224                                     "peer wants %u, accepting only %u byte\n",
3225                                                 data_size, SHARED_SECRET_MAX);
3226                                 err = -EIO;
3227                                 goto reconnect;
3228                         }
3229
3230                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3231                         if (err)
3232                                 goto reconnect;
3233                         /* we expect NUL terminated string */
3234                         /* but just in case someone tries to be evil */
3235                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3236                         p->verify_alg[data_size-1] = 0;
3237
3238                 } else /* apv >= 89 */ {
3239                         /* we still expect NUL terminated strings */
3240                         /* but just in case someone tries to be evil */
3241                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3242                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3243                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3244                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3245                 }
3246
3247                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3248                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3249                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3250                                     old_net_conf->verify_alg, p->verify_alg);
3251                                 goto disconnect;
3252                         }
3253                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3254                                         p->verify_alg, "verify-alg");
3255                         if (IS_ERR(verify_tfm)) {
3256                                 verify_tfm = NULL;
3257                                 goto disconnect;
3258                         }
3259                 }
3260
3261                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3262                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3263                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3264                                     old_net_conf->csums_alg, p->csums_alg);
3265                                 goto disconnect;
3266                         }
3267                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3268                                         p->csums_alg, "csums-alg");
3269                         if (IS_ERR(csums_tfm)) {
3270                                 csums_tfm = NULL;
3271                                 goto disconnect;
3272                         }
3273                 }
3274
3275                 if (apv > 94 && new_disk_conf) {
3276                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3277                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3278                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3279                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3280
3281                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3282                         if (fifo_size != mdev->rs_plan_s->size) {
3283                                 new_plan = fifo_alloc(fifo_size);
3284                                 if (!new_plan) {
3285                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3286                                         put_ldev(mdev);
3287                                         goto disconnect;
3288                                 }
3289                         }
3290                 }
3291
3292                 if (verify_tfm || csums_tfm) {
3293                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3294                         if (!new_net_conf) {
3295                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3296                                 goto disconnect;
3297                         }
3298
3299                         *new_net_conf = *old_net_conf;
3300
3301                         if (verify_tfm) {
3302                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3303                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3304                                 crypto_free_hash(mdev->tconn->verify_tfm);
3305                                 mdev->tconn->verify_tfm = verify_tfm;
3306                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3307                         }
3308                         if (csums_tfm) {
3309                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3310                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3311                                 crypto_free_hash(mdev->tconn->csums_tfm);
3312                                 mdev->tconn->csums_tfm = csums_tfm;
3313                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3314                         }
3315                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3316                 }
3317         }
3318
3319         if (new_disk_conf) {
3320                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3321                 put_ldev(mdev);
3322         }
3323
3324         if (new_plan) {
3325                 old_plan = mdev->rs_plan_s;
3326                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3327         }
3328
3329         mutex_unlock(&mdev->tconn->conf_update);
3330         synchronize_rcu();
3331         if (new_net_conf)
3332                 kfree(old_net_conf);
3333         kfree(old_disk_conf);
3334         kfree(old_plan);
3335
3336         return 0;
3337
3338 reconnect:
3339         if (new_disk_conf) {
3340                 put_ldev(mdev);
3341                 kfree(new_disk_conf);
3342         }
3343         mutex_unlock(&mdev->tconn->conf_update);
3344         return -EIO;
3345
3346 disconnect:
3347         kfree(new_plan);
3348         if (new_disk_conf) {
3349                 put_ldev(mdev);
3350                 kfree(new_disk_conf);
3351         }
3352         mutex_unlock(&mdev->tconn->conf_update);
3353         /* just for completeness: actually not needed,
3354          * as this is not reached if csums_tfm was ok. */
3355         crypto_free_hash(csums_tfm);
3356         /* but free the verify_tfm again, if csums_tfm did not work out */
3357         crypto_free_hash(verify_tfm);
3358         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3359         return -EIO;
3360 }
3361
3362 /* warn if the arguments differ by more than 12.5% */
3363 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3364         const char *s, sector_t a, sector_t b)
3365 {
3366         sector_t d;
3367         if (a == 0 || b == 0)
3368                 return;
3369         d = (a > b) ? (a - b) : (b - a);
3370         if (d > (a>>3) || d > (b>>3))
3371                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3372                      (unsigned long long)a, (unsigned long long)b);
3373 }
3374
3375 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3376 {
3377         struct drbd_conf *mdev;
3378         struct p_sizes *p = pi->data;
3379         enum determine_dev_size dd = unchanged;
3380         sector_t p_size, p_usize, my_usize;
3381         int ldsc = 0; /* local disk size changed */
3382         enum dds_flags ddsf;
3383
3384         mdev = vnr_to_mdev(tconn, pi->vnr);
3385         if (!mdev)
3386                 return config_unknown_volume(tconn, pi);
3387
3388         p_size = be64_to_cpu(p->d_size);
3389         p_usize = be64_to_cpu(p->u_size);
3390
3391         /* just store the peer's disk size for now.
3392          * we still need to figure out whether we accept that. */
3393         mdev->p_size = p_size;
3394
3395         if (get_ldev(mdev)) {
3396                 rcu_read_lock();
3397                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3398                 rcu_read_unlock();
3399
3400                 warn_if_differ_considerably(mdev, "lower level device sizes",
3401                            p_size, drbd_get_max_capacity(mdev->ldev));
3402                 warn_if_differ_considerably(mdev, "user requested size",
3403                                             p_usize, my_usize);
3404
3405                 /* if this is the first connect, or an otherwise expected
3406                  * param exchange, choose the minimum */
3407                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3408                         p_usize = min_not_zero(my_usize, p_usize);
3409
3410                 /* Never shrink a device with usable data during connect.
3411                    But allow online shrinking if we are connected. */
3412                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3413                     drbd_get_capacity(mdev->this_bdev) &&
3414                     mdev->state.disk >= D_OUTDATED &&
3415                     mdev->state.conn < C_CONNECTED) {
3416                         dev_err(DEV, "The peer's disk size is too small!\n");
3417                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3418                         put_ldev(mdev);
3419                         return -EIO;
3420                 }
3421
3422                 if (my_usize != p_usize) {
3423                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3424
3425                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3426                         if (!new_disk_conf) {
3427                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3428                                 put_ldev(mdev);
3429                                 return -ENOMEM;
3430                         }
3431
3432                         mutex_lock(&mdev->tconn->conf_update);
3433                         old_disk_conf = mdev->ldev->disk_conf;
3434                         *new_disk_conf = *old_disk_conf;
3435                         new_disk_conf->disk_size = p_usize;
3436
3437                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3438                         mutex_unlock(&mdev->tconn->conf_update);
3439                         synchronize_rcu();
3440                         kfree(old_disk_conf);
3441
3442                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3443                                  (unsigned long)my_usize);
3444                 }
3445
3446                 put_ldev(mdev);
3447         }
3448
3449         ddsf = be16_to_cpu(p->dds_flags);
3450         if (get_ldev(mdev)) {
3451                 dd = drbd_determine_dev_size(mdev, ddsf);
3452                 put_ldev(mdev);
3453                 if (dd == dev_size_error)
3454                         return -EIO;
3455                 drbd_md_sync(mdev);
3456         } else {
3457                 /* I am diskless, need to accept the peer's size. */
3458                 drbd_set_my_capacity(mdev, p_size);
3459         }
3460
3461         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3462         drbd_reconsider_max_bio_size(mdev);
3463
3464         if (get_ldev(mdev)) {
3465                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3466                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3467                         ldsc = 1;
3468                 }
3469
3470                 put_ldev(mdev);
3471         }
3472
3473         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3474                 if (be64_to_cpu(p->c_size) !=
3475                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3476                         /* we have different sizes, probably peer
3477                          * needs to know my new size... */
3478                         drbd_send_sizes(mdev, 0, ddsf);
3479                 }
3480                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3481                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3482                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3483                             mdev->state.disk >= D_INCONSISTENT) {
3484                                 if (ddsf & DDSF_NO_RESYNC)
3485                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3486                                 else
3487                                         resync_after_online_grow(mdev);
3488                         } else
3489                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3490                 }
3491         }
3492
3493         return 0;
3494 }
3495
3496 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3497 {
3498         struct drbd_conf *mdev;
3499         struct p_uuids *p = pi->data;
3500         u64 *p_uuid;
3501         int i, updated_uuids = 0;
3502
3503         mdev = vnr_to_mdev(tconn, pi->vnr);
3504         if (!mdev)
3505                 return config_unknown_volume(tconn, pi);
3506
3507         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3508
3509         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3510                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3511
3512         kfree(mdev->p_uuid);
3513         mdev->p_uuid = p_uuid;
3514
3515         if (mdev->state.conn < C_CONNECTED &&
3516             mdev->state.disk < D_INCONSISTENT &&
3517             mdev->state.role == R_PRIMARY &&
3518             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3519                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3520                     (unsigned long long)mdev->ed_uuid);
3521                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3522                 return -EIO;
3523         }
3524
3525         if (get_ldev(mdev)) {
3526                 int skip_initial_sync =
3527                         mdev->state.conn == C_CONNECTED &&
3528                         mdev->tconn->agreed_pro_version >= 90 &&
3529                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3530                         (p_uuid[UI_FLAGS] & 8);
3531                 if (skip_initial_sync) {
3532                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3533                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3534                                         "clear_n_write from receive_uuids",
3535                                         BM_LOCKED_TEST_ALLOWED);
3536                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3537                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3538                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3539                                         CS_VERBOSE, NULL);
3540                         drbd_md_sync(mdev);
3541                         updated_uuids = 1;
3542                 }
3543                 put_ldev(mdev);
3544         } else if (mdev->state.disk < D_INCONSISTENT &&
3545                    mdev->state.role == R_PRIMARY) {
3546                 /* I am a diskless primary, the peer just created a new current UUID
3547                    for me. */
3548                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3549         }
3550
3551         /* Before we test for the disk state, we should wait until an eventually
3552            ongoing cluster wide state change is finished. That is important if
3553            we are primary and are detaching from our disk. We need to see the
3554            new disk state... */
3555         mutex_lock(mdev->state_mutex);
3556         mutex_unlock(mdev->state_mutex);
3557         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3558                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3559
3560         if (updated_uuids)
3561                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3562
3563         return 0;
3564 }
3565
3566 /**
3567  * convert_state() - Converts the peer's view of the cluster state to our point of view
3568  * @ps:         The state as seen by the peer.
3569  */
3570 static union drbd_state convert_state(union drbd_state ps)
3571 {
3572         union drbd_state ms;
3573
3574         static enum drbd_conns c_tab[] = {
3575                 [C_CONNECTED] = C_CONNECTED,
3576
3577                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3578                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3579                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3580                 [C_VERIFY_S]       = C_VERIFY_T,
3581                 [C_MASK]   = C_MASK,
3582         };
3583
3584         ms.i = ps.i;
3585
3586         ms.conn = c_tab[ps.conn];
3587         ms.peer = ps.role;
3588         ms.role = ps.peer;
3589         ms.pdsk = ps.disk;
3590         ms.disk = ps.pdsk;
3591         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3592
3593         return ms;
3594 }
3595
3596 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3597 {
3598         struct drbd_conf *mdev;
3599         struct p_req_state *p = pi->data;
3600         union drbd_state mask, val;
3601         enum drbd_state_rv rv;
3602
3603         mdev = vnr_to_mdev(tconn, pi->vnr);
3604         if (!mdev)
3605                 return -EIO;
3606
3607         mask.i = be32_to_cpu(p->mask);
3608         val.i = be32_to_cpu(p->val);
3609
3610         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3611             mutex_is_locked(mdev->state_mutex)) {
3612                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3613                 return 0;
3614         }
3615
3616         mask = convert_state(mask);
3617         val = convert_state(val);
3618
3619         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3620         drbd_send_sr_reply(mdev, rv);
3621
3622         drbd_md_sync(mdev);
3623
3624         return 0;
3625 }
3626
3627 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3628 {
3629         struct p_req_state *p = pi->data;
3630         union drbd_state mask, val;
3631         enum drbd_state_rv rv;
3632
3633         mask.i = be32_to_cpu(p->mask);
3634         val.i = be32_to_cpu(p->val);
3635
3636         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3637             mutex_is_locked(&tconn->cstate_mutex)) {
3638                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3639                 return 0;
3640         }
3641
3642         mask = convert_state(mask);
3643         val = convert_state(val);
3644
3645         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3646         conn_send_sr_reply(tconn, rv);
3647
3648         return 0;
3649 }
3650
3651 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3652 {
3653         struct drbd_conf *mdev;
3654         struct p_state *p = pi->data;
3655         union drbd_state os, ns, peer_state;
3656         enum drbd_disk_state real_peer_disk;
3657         enum chg_state_flags cs_flags;
3658         int rv;
3659
3660         mdev = vnr_to_mdev(tconn, pi->vnr);
3661         if (!mdev)
3662                 return config_unknown_volume(tconn, pi);
3663
3664         peer_state.i = be32_to_cpu(p->state);
3665
3666         real_peer_disk = peer_state.disk;
3667         if (peer_state.disk == D_NEGOTIATING) {
3668                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3669                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3670         }
3671
3672         spin_lock_irq(&mdev->tconn->req_lock);
3673  retry:
3674         os = ns = drbd_read_state(mdev);
3675         spin_unlock_irq(&mdev->tconn->req_lock);
3676
3677         /* peer says his disk is uptodate, while we think it is inconsistent,
3678          * and this happens while we think we have a sync going on. */
3679         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3680             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3681                 /* If we are (becoming) SyncSource, but peer is still in sync
3682                  * preparation, ignore its uptodate-ness to avoid flapping, it
3683                  * will change to inconsistent once the peer reaches active
3684                  * syncing states.
3685                  * It may have changed syncer-paused flags, however, so we
3686                  * cannot ignore this completely. */
3687                 if (peer_state.conn > C_CONNECTED &&
3688                     peer_state.conn < C_SYNC_SOURCE)
3689                         real_peer_disk = D_INCONSISTENT;
3690
3691                 /* if peer_state changes to connected at the same time,
3692                  * it explicitly notifies us that it finished resync.
3693                  * Maybe we should finish it up, too? */
3694                 else if (os.conn >= C_SYNC_SOURCE &&
3695                          peer_state.conn == C_CONNECTED) {
3696                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3697                                 drbd_resync_finished(mdev);
3698                         return 0;
3699                 }
3700         }
3701
3702         /* peer says his disk is inconsistent, while we think it is uptodate,
3703          * and this happens while the peer still thinks we have a sync going on,
3704          * but we think we are already done with the sync.
3705          * We ignore this to avoid flapping pdsk.
3706          * This should not happen, if the peer is a recent version of drbd. */
3707         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3708             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3709                 real_peer_disk = D_UP_TO_DATE;
3710
3711         if (ns.conn == C_WF_REPORT_PARAMS)
3712                 ns.conn = C_CONNECTED;
3713
3714         if (peer_state.conn == C_AHEAD)
3715                 ns.conn = C_BEHIND;
3716
3717         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3718             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3719                 int cr; /* consider resync */
3720
3721                 /* if we established a new connection */
3722                 cr  = (os.conn < C_CONNECTED);
3723                 /* if we had an established connection
3724                  * and one of the nodes newly attaches a disk */
3725                 cr |= (os.conn == C_CONNECTED &&
3726                        (peer_state.disk == D_NEGOTIATING ||
3727                         os.disk == D_NEGOTIATING));
3728                 /* if we have both been inconsistent, and the peer has been
3729                  * forced to be UpToDate with --overwrite-data */
3730                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3731                 /* if we had been plain connected, and the admin requested to
3732                  * start a sync by "invalidate" or "invalidate-remote" */
3733                 cr |= (os.conn == C_CONNECTED &&
3734                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3735                                  peer_state.conn <= C_WF_BITMAP_T));
3736
3737                 if (cr)
3738                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3739
3740                 put_ldev(mdev);
3741                 if (ns.conn == C_MASK) {
3742                         ns.conn = C_CONNECTED;
3743                         if (mdev->state.disk == D_NEGOTIATING) {
3744                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3745                         } else if (peer_state.disk == D_NEGOTIATING) {
3746                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3747                                 peer_state.disk = D_DISKLESS;
3748                                 real_peer_disk = D_DISKLESS;
3749                         } else {
3750                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3751                                         return -EIO;
3752                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3753                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3754                                 return -EIO;
3755                         }
3756                 }
3757         }
3758
3759         spin_lock_irq(&mdev->tconn->req_lock);
3760         if (os.i != drbd_read_state(mdev).i)
3761                 goto retry;
3762         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3763         ns.peer = peer_state.role;
3764         ns.pdsk = real_peer_disk;
3765         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3766         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3767                 ns.disk = mdev->new_state_tmp.disk;
3768         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3769         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3770             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3771                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3772                    for temporal network outages! */
3773                 spin_unlock_irq(&mdev->tconn->req_lock);
3774                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3775                 tl_clear(mdev->tconn);
3776                 drbd_uuid_new_current(mdev);
3777                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3778                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3779                 return -EIO;
3780         }
3781         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3782         ns = drbd_read_state(mdev);
3783         spin_unlock_irq(&mdev->tconn->req_lock);
3784
3785         if (rv < SS_SUCCESS) {
3786                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3787                 return -EIO;
3788         }
3789
3790         if (os.conn > C_WF_REPORT_PARAMS) {
3791                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3792                     peer_state.disk != D_NEGOTIATING ) {
3793                         /* we want resync, peer has not yet decided to sync... */
3794                         /* Nowadays only used when forcing a node into primary role and
3795                            setting its disk to UpToDate with that */
3796                         drbd_send_uuids(mdev);
3797                         drbd_send_state(mdev);
3798                 }
3799         }
3800
3801         mutex_lock(&mdev->tconn->conf_update);
3802         mdev->tconn->net_conf->discard_my_data = 0; /* without copy; single bit op is atomic */
3803         mutex_unlock(&mdev->tconn->conf_update);
3804
3805         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3806
3807         return 0;
3808 }
3809
3810 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3811 {
3812         struct drbd_conf *mdev;
3813         struct p_rs_uuid *p = pi->data;
3814
3815         mdev = vnr_to_mdev(tconn, pi->vnr);
3816         if (!mdev)
3817                 return -EIO;
3818
3819         wait_event(mdev->misc_wait,
3820                    mdev->state.conn == C_WF_SYNC_UUID ||
3821                    mdev->state.conn == C_BEHIND ||
3822                    mdev->state.conn < C_CONNECTED ||
3823                    mdev->state.disk < D_NEGOTIATING);
3824
3825         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3826
3827         /* Here the _drbd_uuid_ functions are right, current should
3828            _not_ be rotated into the history */
3829         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3830                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3831                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3832
3833                 drbd_print_uuids(mdev, "updated sync uuid");
3834                 drbd_start_resync(mdev, C_SYNC_TARGET);
3835
3836                 put_ldev(mdev);
3837         } else
3838                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3839
3840         return 0;
3841 }
3842
3843 /**
3844  * receive_bitmap_plain
3845  *
3846  * Return 0 when done, 1 when another iteration is needed, and a negative error
3847  * code upon failure.
3848  */
3849 static int
3850 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3851                      unsigned long *p, struct bm_xfer_ctx *c)
3852 {
3853         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3854                                  drbd_header_size(mdev->tconn);
3855         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3856                                        c->bm_words - c->word_offset);
3857         unsigned int want = num_words * sizeof(*p);
3858         int err;
3859
3860         if (want != size) {
3861                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3862                 return -EIO;
3863         }
3864         if (want == 0)
3865                 return 0;
3866         err = drbd_recv_all(mdev->tconn, p, want);
3867         if (err)
3868                 return err;
3869
3870         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3871
3872         c->word_offset += num_words;
3873         c->bit_offset = c->word_offset * BITS_PER_LONG;
3874         if (c->bit_offset > c->bm_bits)
3875                 c->bit_offset = c->bm_bits;
3876
3877         return 1;
3878 }
3879
3880 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3881 {
3882         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3883 }
3884
3885 static int dcbp_get_start(struct p_compressed_bm *p)
3886 {
3887         return (p->encoding & 0x80) != 0;
3888 }
3889
3890 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3891 {
3892         return (p->encoding >> 4) & 0x7;
3893 }
3894
3895 /**
3896  * recv_bm_rle_bits
3897  *
3898  * Return 0 when done, 1 when another iteration is needed, and a negative error
3899  * code upon failure.
3900  */
3901 static int
3902 recv_bm_rle_bits(struct drbd_conf *mdev,
3903                 struct p_compressed_bm *p,
3904                  struct bm_xfer_ctx *c,
3905                  unsigned int len)
3906 {
3907         struct bitstream bs;
3908         u64 look_ahead;
3909         u64 rl;
3910         u64 tmp;
3911         unsigned long s = c->bit_offset;
3912         unsigned long e;
3913         int toggle = dcbp_get_start(p);
3914         int have;
3915         int bits;
3916
3917         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3918
3919         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3920         if (bits < 0)
3921                 return -EIO;
3922
3923         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3924                 bits = vli_decode_bits(&rl, look_ahead);
3925                 if (bits <= 0)
3926                         return -EIO;
3927
3928                 if (toggle) {
3929                         e = s + rl -1;
3930                         if (e >= c->bm_bits) {
3931                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3932                                 return -EIO;
3933                         }
3934                         _drbd_bm_set_bits(mdev, s, e);
3935                 }
3936
3937                 if (have < bits) {
3938                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3939                                 have, bits, look_ahead,
3940                                 (unsigned int)(bs.cur.b - p->code),
3941                                 (unsigned int)bs.buf_len);
3942                         return -EIO;
3943                 }
3944                 look_ahead >>= bits;
3945                 have -= bits;
3946
3947                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3948                 if (bits < 0)
3949                         return -EIO;
3950                 look_ahead |= tmp << have;
3951                 have += bits;
3952         }
3953
3954         c->bit_offset = s;
3955         bm_xfer_ctx_bit_to_word_offset(c);
3956
3957         return (s != c->bm_bits);
3958 }
3959
3960 /**
3961  * decode_bitmap_c
3962  *
3963  * Return 0 when done, 1 when another iteration is needed, and a negative error
3964  * code upon failure.
3965  */
3966 static int
3967 decode_bitmap_c(struct drbd_conf *mdev,
3968                 struct p_compressed_bm *p,
3969                 struct bm_xfer_ctx *c,
3970                 unsigned int len)
3971 {
3972         if (dcbp_get_code(p) == RLE_VLI_Bits)
3973                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
3974
3975         /* other variants had been implemented for evaluation,
3976          * but have been dropped as this one turned out to be "best"
3977          * during all our tests. */
3978
3979         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3980         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3981         return -EIO;
3982 }
3983
3984 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3985                 const char *direction, struct bm_xfer_ctx *c)
3986 {
3987         /* what would it take to transfer it "plaintext" */
3988         unsigned int header_size = drbd_header_size(mdev->tconn);
3989         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3990         unsigned int plain =
3991                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3992                 c->bm_words * sizeof(unsigned long);
3993         unsigned int total = c->bytes[0] + c->bytes[1];
3994         unsigned int r;
3995
3996         /* total can not be zero. but just in case: */
3997         if (total == 0)
3998                 return;
3999
4000         /* don't report if not compressed */
4001         if (total >= plain)
4002                 return;
4003
4004         /* total < plain. check for overflow, still */
4005         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4006                                     : (1000 * total / plain);
4007
4008         if (r > 1000)
4009                 r = 1000;
4010
4011         r = 1000 - r;
4012         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4013              "total %u; compression: %u.%u%%\n",
4014                         direction,
4015                         c->bytes[1], c->packets[1],
4016                         c->bytes[0], c->packets[0],
4017                         total, r/10, r % 10);
4018 }
4019
4020 /* Since we are processing the bitfield from lower addresses to higher,
4021    it does not matter if the process it in 32 bit chunks or 64 bit
4022    chunks as long as it is little endian. (Understand it as byte stream,
4023    beginning with the lowest byte...) If we would use big endian
4024    we would need to process it from the highest address to the lowest,
4025    in order to be agnostic to the 32 vs 64 bits issue.
4026
4027    returns 0 on failure, 1 if we successfully received it. */
4028 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4029 {
4030         struct drbd_conf *mdev;
4031         struct bm_xfer_ctx c;
4032         int err;
4033
4034         mdev = vnr_to_mdev(tconn, pi->vnr);
4035         if (!mdev)
4036                 return -EIO;
4037
4038         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4039         /* you are supposed to send additional out-of-sync information
4040          * if you actually set bits during this phase */
4041
4042         c = (struct bm_xfer_ctx) {
4043                 .bm_bits = drbd_bm_bits(mdev),
4044                 .bm_words = drbd_bm_words(mdev),
4045         };
4046
4047         for(;;) {
4048                 if (pi->cmd == P_BITMAP)
4049                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4050                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4051                         /* MAYBE: sanity check that we speak proto >= 90,
4052                          * and the feature is enabled! */
4053                         struct p_compressed_bm *p = pi->data;
4054
4055                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4056                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4057                                 err = -EIO;
4058                                 goto out;
4059                         }
4060                         if (pi->size <= sizeof(*p)) {
4061                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4062                                 err = -EIO;
4063                                 goto out;
4064                         }
4065                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4066                         if (err)
4067                                goto out;
4068                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4069                 } else {
4070                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4071                         err = -EIO;
4072                         goto out;
4073                 }
4074
4075                 c.packets[pi->cmd == P_BITMAP]++;
4076                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4077
4078                 if (err <= 0) {
4079                         if (err < 0)
4080                                 goto out;
4081                         break;
4082                 }
4083                 err = drbd_recv_header(mdev->tconn, pi);
4084                 if (err)
4085                         goto out;
4086         }
4087
4088         INFO_bm_xfer_stats(mdev, "receive", &c);
4089
4090         if (mdev->state.conn == C_WF_BITMAP_T) {
4091                 enum drbd_state_rv rv;
4092
4093                 err = drbd_send_bitmap(mdev);
4094                 if (err)
4095                         goto out;
4096                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4097                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4098                 D_ASSERT(rv == SS_SUCCESS);
4099         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4100                 /* admin may have requested C_DISCONNECTING,
4101                  * other threads may have noticed network errors */
4102                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4103                     drbd_conn_str(mdev->state.conn));
4104         }
4105         err = 0;
4106
4107  out:
4108         drbd_bm_unlock(mdev);
4109         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4110                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4111         return err;
4112 }
4113
4114 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4115 {
4116         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4117                  pi->cmd, pi->size);
4118
4119         return ignore_remaining_packet(tconn, pi);
4120 }
4121
4122 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4123 {
4124         /* Make sure we've acked all the TCP data associated
4125          * with the data requests being unplugged */
4126         drbd_tcp_quickack(tconn->data.socket);
4127
4128         return 0;
4129 }
4130
4131 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4132 {
4133         struct drbd_conf *mdev;
4134         struct p_block_desc *p = pi->data;
4135
4136         mdev = vnr_to_mdev(tconn, pi->vnr);
4137         if (!mdev)
4138                 return -EIO;
4139
4140         switch (mdev->state.conn) {
4141         case C_WF_SYNC_UUID:
4142         case C_WF_BITMAP_T:
4143         case C_BEHIND:
4144                         break;
4145         default:
4146                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4147                                 drbd_conn_str(mdev->state.conn));
4148         }
4149
4150         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4151
4152         return 0;
4153 }
4154
4155 struct data_cmd {
4156         int expect_payload;
4157         size_t pkt_size;
4158         int (*fn)(struct drbd_tconn *, struct packet_info *);
4159 };
4160
4161 static struct data_cmd drbd_cmd_handler[] = {
4162         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4163         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4164         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4165         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4166         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4167         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4168         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4169         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4170         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4171         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4172         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4173         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4174         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4175         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4176         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4177         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4178         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4179         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4180         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4181         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4182         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4183         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4184         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4185 };
4186
4187 static void drbdd(struct drbd_tconn *tconn)
4188 {
4189         struct packet_info pi;
4190         size_t shs; /* sub header size */
4191         int err;
4192
4193         while (get_t_state(&tconn->receiver) == RUNNING) {
4194                 struct data_cmd *cmd;
4195
4196                 drbd_thread_current_set_cpu(&tconn->receiver);
4197                 if (drbd_recv_header(tconn, &pi))
4198                         goto err_out;
4199
4200                 cmd = &drbd_cmd_handler[pi.cmd];
4201                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4202                         conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4203                         goto err_out;
4204                 }
4205
4206                 shs = cmd->pkt_size;
4207                 if (pi.size > shs && !cmd->expect_payload) {
4208                         conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4209                         goto err_out;
4210                 }
4211
4212                 if (shs) {
4213                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4214                         if (err)
4215                                 goto err_out;
4216                         pi.size -= shs;
4217                 }
4218
4219                 err = cmd->fn(tconn, &pi);
4220                 if (err) {
4221                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4222                                  cmdname(pi.cmd), err, pi.size);
4223                         goto err_out;
4224                 }
4225         }
4226         return;
4227
4228     err_out:
4229         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4230 }
4231
4232 void conn_flush_workqueue(struct drbd_tconn *tconn)
4233 {
4234         struct drbd_wq_barrier barr;
4235
4236         barr.w.cb = w_prev_work_done;
4237         barr.w.tconn = tconn;
4238         init_completion(&barr.done);
4239         drbd_queue_work(&tconn->data.work, &barr.w);
4240         wait_for_completion(&barr.done);
4241 }
4242
4243 static void conn_disconnect(struct drbd_tconn *tconn)
4244 {
4245         struct drbd_conf *mdev;
4246         enum drbd_conns oc;
4247         int vnr, rv = SS_UNKNOWN_ERROR;
4248
4249         if (tconn->cstate == C_STANDALONE)
4250                 return;
4251
4252         /* asender does not clean up anything. it must not interfere, either */
4253         drbd_thread_stop(&tconn->asender);
4254         drbd_free_sock(tconn);
4255
4256         rcu_read_lock();
4257         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4258                 kref_get(&mdev->kref);
4259                 rcu_read_unlock();
4260                 drbd_disconnected(mdev);
4261                 kref_put(&mdev->kref, &drbd_minor_destroy);
4262                 rcu_read_lock();
4263         }
4264         rcu_read_unlock();
4265
4266         conn_info(tconn, "Connection closed\n");
4267
4268         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4269                 conn_try_outdate_peer_async(tconn);
4270
4271         spin_lock_irq(&tconn->req_lock);
4272         oc = tconn->cstate;
4273         if (oc >= C_UNCONNECTED)
4274                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4275
4276         spin_unlock_irq(&tconn->req_lock);
4277
4278         if (oc == C_DISCONNECTING)
4279                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4280 }
4281
4282 static int drbd_disconnected(struct drbd_conf *mdev)
4283 {
4284         enum drbd_fencing_p fp;
4285         unsigned int i;
4286
4287         /* wait for current activity to cease. */
4288         spin_lock_irq(&mdev->tconn->req_lock);
4289         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4290         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4291         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4292         spin_unlock_irq(&mdev->tconn->req_lock);
4293
4294         /* We do not have data structures that would allow us to
4295          * get the rs_pending_cnt down to 0 again.
4296          *  * On C_SYNC_TARGET we do not have any data structures describing
4297          *    the pending RSDataRequest's we have sent.
4298          *  * On C_SYNC_SOURCE there is no data structure that tracks
4299          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4300          *  And no, it is not the sum of the reference counts in the
4301          *  resync_LRU. The resync_LRU tracks the whole operation including
4302          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4303          *  on the fly. */
4304         drbd_rs_cancel_all(mdev);
4305         mdev->rs_total = 0;
4306         mdev->rs_failed = 0;
4307         atomic_set(&mdev->rs_pending_cnt, 0);
4308         wake_up(&mdev->misc_wait);
4309
4310         del_timer(&mdev->request_timer);
4311
4312         del_timer_sync(&mdev->resync_timer);
4313         resync_timer_fn((unsigned long)mdev);
4314
4315         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4316          * w_make_resync_request etc. which may still be on the worker queue
4317          * to be "canceled" */
4318         drbd_flush_workqueue(mdev);
4319
4320         drbd_finish_peer_reqs(mdev);
4321
4322         kfree(mdev->p_uuid);
4323         mdev->p_uuid = NULL;
4324
4325         if (!drbd_suspended(mdev))
4326                 tl_clear(mdev->tconn);
4327
4328         drbd_md_sync(mdev);
4329
4330         fp = FP_DONT_CARE;
4331         if (get_ldev(mdev)) {
4332                 rcu_read_lock();
4333                 fp = rcu_dereference(mdev->ldev->disk_conf)->fencing;
4334                 rcu_read_unlock();
4335                 put_ldev(mdev);
4336         }
4337
4338         /* serialize with bitmap writeout triggered by the state change,
4339          * if any. */
4340         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4341
4342         /* tcp_close and release of sendpage pages can be deferred.  I don't
4343          * want to use SO_LINGER, because apparently it can be deferred for
4344          * more than 20 seconds (longest time I checked).
4345          *
4346          * Actually we don't care for exactly when the network stack does its
4347          * put_page(), but release our reference on these pages right here.
4348          */
4349         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4350         if (i)
4351                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4352         i = atomic_read(&mdev->pp_in_use_by_net);
4353         if (i)
4354                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4355         i = atomic_read(&mdev->pp_in_use);
4356         if (i)
4357                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4358
4359         D_ASSERT(list_empty(&mdev->read_ee));
4360         D_ASSERT(list_empty(&mdev->active_ee));
4361         D_ASSERT(list_empty(&mdev->sync_ee));
4362         D_ASSERT(list_empty(&mdev->done_ee));
4363
4364         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4365         atomic_set(&mdev->current_epoch->epoch_size, 0);
4366         D_ASSERT(list_empty(&mdev->current_epoch->list));
4367
4368         return 0;
4369 }
4370
4371 /*
4372  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4373  * we can agree on is stored in agreed_pro_version.
4374  *
4375  * feature flags and the reserved array should be enough room for future
4376  * enhancements of the handshake protocol, and possible plugins...
4377  *
4378  * for now, they are expected to be zero, but ignored.
4379  */
4380 static int drbd_send_features(struct drbd_tconn *tconn)
4381 {
4382         struct drbd_socket *sock;
4383         struct p_connection_features *p;
4384
4385         sock = &tconn->data;
4386         p = conn_prepare_command(tconn, sock);
4387         if (!p)
4388                 return -EIO;
4389         memset(p, 0, sizeof(*p));
4390         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4391         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4392         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4393 }
4394
4395 /*
4396  * return values:
4397  *   1 yes, we have a valid connection
4398  *   0 oops, did not work out, please try again
4399  *  -1 peer talks different language,
4400  *     no point in trying again, please go standalone.
4401  */
4402 static int drbd_do_features(struct drbd_tconn *tconn)
4403 {
4404         /* ASSERT current == tconn->receiver ... */
4405         struct p_connection_features *p;
4406         const int expect = sizeof(struct p_connection_features);
4407         struct packet_info pi;
4408         int err;
4409
4410         err = drbd_send_features(tconn);
4411         if (err)
4412                 return 0;
4413
4414         err = drbd_recv_header(tconn, &pi);
4415         if (err)
4416                 return 0;
4417
4418         if (pi.cmd != P_CONNECTION_FEATURES) {
4419                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4420                      cmdname(pi.cmd), pi.cmd);
4421                 return -1;
4422         }
4423
4424         if (pi.size != expect) {
4425                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4426                      expect, pi.size);
4427                 return -1;
4428         }
4429
4430         p = pi.data;
4431         err = drbd_recv_all_warn(tconn, p, expect);
4432         if (err)
4433                 return 0;
4434
4435         p->protocol_min = be32_to_cpu(p->protocol_min);
4436         p->protocol_max = be32_to_cpu(p->protocol_max);
4437         if (p->protocol_max == 0)
4438                 p->protocol_max = p->protocol_min;
4439
4440         if (PRO_VERSION_MAX < p->protocol_min ||
4441             PRO_VERSION_MIN > p->protocol_max)
4442                 goto incompat;
4443
4444         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4445
4446         conn_info(tconn, "Handshake successful: "
4447              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4448
4449         return 1;
4450
4451  incompat:
4452         conn_err(tconn, "incompatible DRBD dialects: "
4453             "I support %d-%d, peer supports %d-%d\n",
4454             PRO_VERSION_MIN, PRO_VERSION_MAX,
4455             p->protocol_min, p->protocol_max);
4456         return -1;
4457 }
4458
4459 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4460 static int drbd_do_auth(struct drbd_tconn *tconn)
4461 {
4462         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4463         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4464         return -1;
4465 }
4466 #else
4467 #define CHALLENGE_LEN 64
4468
4469 /* Return value:
4470         1 - auth succeeded,
4471         0 - failed, try again (network error),
4472         -1 - auth failed, don't try again.
4473 */
4474
4475 static int drbd_do_auth(struct drbd_tconn *tconn)
4476 {
4477         struct drbd_socket *sock;
4478         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4479         struct scatterlist sg;
4480         char *response = NULL;
4481         char *right_response = NULL;
4482         char *peers_ch = NULL;
4483         unsigned int key_len;
4484         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4485         unsigned int resp_size;
4486         struct hash_desc desc;
4487         struct packet_info pi;
4488         struct net_conf *nc;
4489         int err, rv;
4490
4491         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4492
4493         rcu_read_lock();
4494         nc = rcu_dereference(tconn->net_conf);
4495         key_len = strlen(nc->shared_secret);
4496         memcpy(secret, nc->shared_secret, key_len);
4497         rcu_read_unlock();
4498
4499         desc.tfm = tconn->cram_hmac_tfm;
4500         desc.flags = 0;
4501
4502         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4503         if (rv) {
4504                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4505                 rv = -1;
4506                 goto fail;
4507         }
4508
4509         get_random_bytes(my_challenge, CHALLENGE_LEN);
4510
4511         sock = &tconn->data;
4512         if (!conn_prepare_command(tconn, sock)) {
4513                 rv = 0;
4514                 goto fail;
4515         }
4516         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4517                                 my_challenge, CHALLENGE_LEN);
4518         if (!rv)
4519                 goto fail;
4520
4521         err = drbd_recv_header(tconn, &pi);
4522         if (err) {
4523                 rv = 0;
4524                 goto fail;
4525         }
4526
4527         if (pi.cmd != P_AUTH_CHALLENGE) {
4528                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4529                     cmdname(pi.cmd), pi.cmd);
4530                 rv = 0;
4531                 goto fail;
4532         }
4533
4534         if (pi.size > CHALLENGE_LEN * 2) {
4535                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4536                 rv = -1;
4537                 goto fail;
4538         }
4539
4540         peers_ch = kmalloc(pi.size, GFP_NOIO);
4541         if (peers_ch == NULL) {
4542                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4543                 rv = -1;
4544                 goto fail;
4545         }
4546
4547         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4548         if (err) {
4549                 rv = 0;
4550                 goto fail;
4551         }
4552
4553         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4554         response = kmalloc(resp_size, GFP_NOIO);
4555         if (response == NULL) {
4556                 conn_err(tconn, "kmalloc of response failed\n");
4557                 rv = -1;
4558                 goto fail;
4559         }
4560
4561         sg_init_table(&sg, 1);
4562         sg_set_buf(&sg, peers_ch, pi.size);
4563
4564         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4565         if (rv) {
4566                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4567                 rv = -1;
4568                 goto fail;
4569         }
4570
4571         if (!conn_prepare_command(tconn, sock)) {
4572                 rv = 0;
4573                 goto fail;
4574         }
4575         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4576                                 response, resp_size);
4577         if (!rv)
4578                 goto fail;
4579
4580         err = drbd_recv_header(tconn, &pi);
4581         if (err) {
4582                 rv = 0;
4583                 goto fail;
4584         }
4585
4586         if (pi.cmd != P_AUTH_RESPONSE) {
4587                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4588                         cmdname(pi.cmd), pi.cmd);
4589                 rv = 0;
4590                 goto fail;
4591         }
4592
4593         if (pi.size != resp_size) {
4594                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4595                 rv = 0;
4596                 goto fail;
4597         }
4598
4599         err = drbd_recv_all_warn(tconn, response , resp_size);
4600         if (err) {
4601                 rv = 0;
4602                 goto fail;
4603         }
4604
4605         right_response = kmalloc(resp_size, GFP_NOIO);
4606         if (right_response == NULL) {
4607                 conn_err(tconn, "kmalloc of right_response failed\n");
4608                 rv = -1;
4609                 goto fail;
4610         }
4611
4612         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4613
4614         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4615         if (rv) {
4616                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4617                 rv = -1;
4618                 goto fail;
4619         }
4620
4621         rv = !memcmp(response, right_response, resp_size);
4622
4623         if (rv)
4624                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4625                      resp_size);
4626         else
4627                 rv = -1;
4628
4629  fail:
4630         kfree(peers_ch);
4631         kfree(response);
4632         kfree(right_response);
4633
4634         return rv;
4635 }
4636 #endif
4637
4638 int drbdd_init(struct drbd_thread *thi)
4639 {
4640         struct drbd_tconn *tconn = thi->tconn;
4641         int h;
4642
4643         conn_info(tconn, "receiver (re)started\n");
4644
4645         do {
4646                 h = conn_connect(tconn);
4647                 if (h == 0) {
4648                         conn_disconnect(tconn);
4649                         schedule_timeout_interruptible(HZ);
4650                 }
4651                 if (h == -1) {
4652                         conn_warn(tconn, "Discarding network configuration.\n");
4653                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4654                 }
4655         } while (h == 0);
4656
4657         if (h > 0)
4658                 drbdd(tconn);
4659
4660         conn_disconnect(tconn);
4661
4662         conn_info(tconn, "receiver terminated\n");
4663         return 0;
4664 }
4665
4666 /* ********* acknowledge sender ******** */
4667
4668 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4669 {
4670         struct p_req_state_reply *p = pi->data;
4671         int retcode = be32_to_cpu(p->retcode);
4672
4673         if (retcode >= SS_SUCCESS) {
4674                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4675         } else {
4676                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4677                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4678                          drbd_set_st_err_str(retcode), retcode);
4679         }
4680         wake_up(&tconn->ping_wait);
4681
4682         return 0;
4683 }
4684
4685 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4686 {
4687         struct drbd_conf *mdev;
4688         struct p_req_state_reply *p = pi->data;
4689         int retcode = be32_to_cpu(p->retcode);
4690
4691         mdev = vnr_to_mdev(tconn, pi->vnr);
4692         if (!mdev)
4693                 return -EIO;
4694
4695         if (retcode >= SS_SUCCESS) {
4696                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4697         } else {
4698                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4699                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4700                         drbd_set_st_err_str(retcode), retcode);
4701         }
4702         wake_up(&mdev->state_wait);
4703
4704         return 0;
4705 }
4706
4707 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4708 {
4709         return drbd_send_ping_ack(tconn);
4710
4711 }
4712
4713 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4714 {
4715         /* restore idle timeout */
4716         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4717         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4718                 wake_up(&tconn->ping_wait);
4719
4720         return 0;
4721 }
4722
4723 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4724 {
4725         struct drbd_conf *mdev;
4726         struct p_block_ack *p = pi->data;
4727         sector_t sector = be64_to_cpu(p->sector);
4728         int blksize = be32_to_cpu(p->blksize);
4729
4730         mdev = vnr_to_mdev(tconn, pi->vnr);
4731         if (!mdev)
4732                 return -EIO;
4733
4734         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4735
4736         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4737
4738         if (get_ldev(mdev)) {
4739                 drbd_rs_complete_io(mdev, sector);
4740                 drbd_set_in_sync(mdev, sector, blksize);
4741                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4742                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4743                 put_ldev(mdev);
4744         }
4745         dec_rs_pending(mdev);
4746         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4747
4748         return 0;
4749 }
4750
4751 static int
4752 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4753                               struct rb_root *root, const char *func,
4754                               enum drbd_req_event what, bool missing_ok)
4755 {
4756         struct drbd_request *req;
4757         struct bio_and_error m;
4758
4759         spin_lock_irq(&mdev->tconn->req_lock);
4760         req = find_request(mdev, root, id, sector, missing_ok, func);
4761         if (unlikely(!req)) {
4762                 spin_unlock_irq(&mdev->tconn->req_lock);
4763                 return -EIO;
4764         }
4765         __req_mod(req, what, &m);
4766         spin_unlock_irq(&mdev->tconn->req_lock);
4767
4768         if (m.bio)
4769                 complete_master_bio(mdev, &m);
4770         return 0;
4771 }
4772
4773 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4774 {
4775         struct drbd_conf *mdev;
4776         struct p_block_ack *p = pi->data;
4777         sector_t sector = be64_to_cpu(p->sector);
4778         int blksize = be32_to_cpu(p->blksize);
4779         enum drbd_req_event what;
4780
4781         mdev = vnr_to_mdev(tconn, pi->vnr);
4782         if (!mdev)
4783                 return -EIO;
4784
4785         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4786
4787         if (p->block_id == ID_SYNCER) {
4788                 drbd_set_in_sync(mdev, sector, blksize);
4789                 dec_rs_pending(mdev);
4790                 return 0;
4791         }
4792         switch (pi->cmd) {
4793         case P_RS_WRITE_ACK:
4794                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4795                 break;
4796         case P_WRITE_ACK:
4797                 what = WRITE_ACKED_BY_PEER;
4798                 break;
4799         case P_RECV_ACK:
4800                 what = RECV_ACKED_BY_PEER;
4801                 break;
4802         case P_DISCARD_WRITE:
4803                 what = DISCARD_WRITE;
4804                 break;
4805         case P_RETRY_WRITE:
4806                 what = POSTPONE_WRITE;
4807                 break;
4808         default:
4809                 BUG();
4810         }
4811
4812         return validate_req_change_req_state(mdev, p->block_id, sector,
4813                                              &mdev->write_requests, __func__,
4814                                              what, false);
4815 }
4816
4817 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4818 {
4819         struct drbd_conf *mdev;
4820         struct p_block_ack *p = pi->data;
4821         sector_t sector = be64_to_cpu(p->sector);
4822         int size = be32_to_cpu(p->blksize);
4823         int err;
4824
4825         mdev = vnr_to_mdev(tconn, pi->vnr);
4826         if (!mdev)
4827                 return -EIO;
4828
4829         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4830
4831         if (p->block_id == ID_SYNCER) {
4832                 dec_rs_pending(mdev);
4833                 drbd_rs_failed_io(mdev, sector, size);
4834                 return 0;
4835         }
4836
4837         err = validate_req_change_req_state(mdev, p->block_id, sector,
4838                                             &mdev->write_requests, __func__,
4839                                             NEG_ACKED, true);
4840         if (err) {
4841                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4842                    The master bio might already be completed, therefore the
4843                    request is no longer in the collision hash. */
4844                 /* In Protocol B we might already have got a P_RECV_ACK
4845                    but then get a P_NEG_ACK afterwards. */
4846                 drbd_set_out_of_sync(mdev, sector, size);
4847         }
4848         return 0;
4849 }
4850
4851 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4852 {
4853         struct drbd_conf *mdev;
4854         struct p_block_ack *p = pi->data;
4855         sector_t sector = be64_to_cpu(p->sector);
4856
4857         mdev = vnr_to_mdev(tconn, pi->vnr);
4858         if (!mdev)
4859                 return -EIO;
4860
4861         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4862
4863         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4864             (unsigned long long)sector, be32_to_cpu(p->blksize));
4865
4866         return validate_req_change_req_state(mdev, p->block_id, sector,
4867                                              &mdev->read_requests, __func__,
4868                                              NEG_ACKED, false);
4869 }
4870
4871 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4872 {
4873         struct drbd_conf *mdev;
4874         sector_t sector;
4875         int size;
4876         struct p_block_ack *p = pi->data;
4877
4878         mdev = vnr_to_mdev(tconn, pi->vnr);
4879         if (!mdev)
4880                 return -EIO;
4881
4882         sector = be64_to_cpu(p->sector);
4883         size = be32_to_cpu(p->blksize);
4884
4885         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4886
4887         dec_rs_pending(mdev);
4888
4889         if (get_ldev_if_state(mdev, D_FAILED)) {
4890                 drbd_rs_complete_io(mdev, sector);
4891                 switch (pi->cmd) {
4892                 case P_NEG_RS_DREPLY:
4893                         drbd_rs_failed_io(mdev, sector, size);
4894                 case P_RS_CANCEL:
4895                         break;
4896                 default:
4897                         BUG();
4898                 }
4899                 put_ldev(mdev);
4900         }
4901
4902         return 0;
4903 }
4904
4905 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4906 {
4907         struct drbd_conf *mdev;
4908         struct p_barrier_ack *p = pi->data;
4909
4910         mdev = vnr_to_mdev(tconn, pi->vnr);
4911         if (!mdev)
4912                 return -EIO;
4913
4914         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4915
4916         if (mdev->state.conn == C_AHEAD &&
4917             atomic_read(&mdev->ap_in_flight) == 0 &&
4918             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4919                 mdev->start_resync_timer.expires = jiffies + HZ;
4920                 add_timer(&mdev->start_resync_timer);
4921         }
4922
4923         return 0;
4924 }
4925
4926 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4927 {
4928         struct drbd_conf *mdev;
4929         struct p_block_ack *p = pi->data;
4930         struct drbd_work *w;
4931         sector_t sector;
4932         int size;
4933
4934         mdev = vnr_to_mdev(tconn, pi->vnr);
4935         if (!mdev)
4936                 return -EIO;
4937
4938         sector = be64_to_cpu(p->sector);
4939         size = be32_to_cpu(p->blksize);
4940
4941         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4942
4943         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4944                 drbd_ov_out_of_sync_found(mdev, sector, size);
4945         else
4946                 ov_out_of_sync_print(mdev);
4947
4948         if (!get_ldev(mdev))
4949                 return 0;
4950
4951         drbd_rs_complete_io(mdev, sector);
4952         dec_rs_pending(mdev);
4953
4954         --mdev->ov_left;
4955
4956         /* let's advance progress step marks only for every other megabyte */
4957         if ((mdev->ov_left & 0x200) == 0x200)
4958                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4959
4960         if (mdev->ov_left == 0) {
4961                 w = kmalloc(sizeof(*w), GFP_NOIO);
4962                 if (w) {
4963                         w->cb = w_ov_finished;
4964                         w->mdev = mdev;
4965                         drbd_queue_work_front(&mdev->tconn->data.work, w);
4966                 } else {
4967                         dev_err(DEV, "kmalloc(w) failed.");
4968                         ov_out_of_sync_print(mdev);
4969                         drbd_resync_finished(mdev);
4970                 }
4971         }
4972         put_ldev(mdev);
4973         return 0;
4974 }
4975
4976 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4977 {
4978         return 0;
4979 }
4980
4981 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
4982 {
4983         struct drbd_conf *mdev;
4984         int vnr, not_empty = 0;
4985
4986         do {
4987                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4988                 flush_signals(current);
4989
4990                 rcu_read_lock();
4991                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4992                         kref_get(&mdev->kref);
4993                         rcu_read_unlock();
4994                         if (drbd_finish_peer_reqs(mdev)) {
4995                                 kref_put(&mdev->kref, &drbd_minor_destroy);
4996                                 return 1;
4997                         }
4998                         kref_put(&mdev->kref, &drbd_minor_destroy);
4999                         rcu_read_lock();
5000                 }
5001                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5002
5003                 spin_lock_irq(&tconn->req_lock);
5004                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5005                         not_empty = !list_empty(&mdev->done_ee);
5006                         if (not_empty)
5007                                 break;
5008                 }
5009                 spin_unlock_irq(&tconn->req_lock);
5010                 rcu_read_unlock();
5011         } while (not_empty);
5012
5013         return 0;
5014 }
5015
5016 struct asender_cmd {
5017         size_t pkt_size;
5018         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5019 };
5020
5021 static struct asender_cmd asender_tbl[] = {
5022         [P_PING]            = { 0, got_Ping },
5023         [P_PING_ACK]        = { 0, got_PingAck },
5024         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5025         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5026         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5027         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
5028         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5029         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5030         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5031         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5032         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5033         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5034         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5035         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5036         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5037         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5038         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5039 };
5040
5041 int drbd_asender(struct drbd_thread *thi)
5042 {
5043         struct drbd_tconn *tconn = thi->tconn;
5044         struct asender_cmd *cmd = NULL;
5045         struct packet_info pi;
5046         int rv;
5047         void *buf    = tconn->meta.rbuf;
5048         int received = 0;
5049         unsigned int header_size = drbd_header_size(tconn);
5050         int expect   = header_size;
5051         bool ping_timeout_active = false;
5052         struct net_conf *nc;
5053         int ping_timeo, tcp_cork, ping_int;
5054
5055         current->policy = SCHED_RR;  /* Make this a realtime task! */
5056         current->rt_priority = 2;    /* more important than all other tasks */
5057
5058         while (get_t_state(thi) == RUNNING) {
5059                 drbd_thread_current_set_cpu(thi);
5060
5061                 rcu_read_lock();
5062                 nc = rcu_dereference(tconn->net_conf);
5063                 ping_timeo = nc->ping_timeo;
5064                 tcp_cork = nc->tcp_cork;
5065                 ping_int = nc->ping_int;
5066                 rcu_read_unlock();
5067
5068                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5069                         if (drbd_send_ping(tconn)) {
5070                                 conn_err(tconn, "drbd_send_ping has failed\n");
5071                                 goto reconnect;
5072                         }
5073                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5074                         ping_timeout_active = true;
5075                 }
5076
5077                 /* TODO: conditionally cork; it may hurt latency if we cork without
5078                    much to send */
5079                 if (tcp_cork)
5080                         drbd_tcp_cork(tconn->meta.socket);
5081                 if (tconn_finish_peer_reqs(tconn)) {
5082                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5083                         goto reconnect;
5084                 }
5085                 /* but unconditionally uncork unless disabled */
5086                 if (tcp_cork)
5087                         drbd_tcp_uncork(tconn->meta.socket);
5088
5089                 /* short circuit, recv_msg would return EINTR anyways. */
5090                 if (signal_pending(current))
5091                         continue;
5092
5093                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5094                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5095
5096                 flush_signals(current);
5097
5098                 /* Note:
5099                  * -EINTR        (on meta) we got a signal
5100                  * -EAGAIN       (on meta) rcvtimeo expired
5101                  * -ECONNRESET   other side closed the connection
5102                  * -ERESTARTSYS  (on data) we got a signal
5103                  * rv <  0       other than above: unexpected error!
5104                  * rv == expected: full header or command
5105                  * rv <  expected: "woken" by signal during receive
5106                  * rv == 0       : "connection shut down by peer"
5107                  */
5108                 if (likely(rv > 0)) {
5109                         received += rv;
5110                         buf      += rv;
5111                 } else if (rv == 0) {
5112                         conn_err(tconn, "meta connection shut down by peer.\n");
5113                         goto reconnect;
5114                 } else if (rv == -EAGAIN) {
5115                         /* If the data socket received something meanwhile,
5116                          * that is good enough: peer is still alive. */
5117                         if (time_after(tconn->last_received,
5118                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5119                                 continue;
5120                         if (ping_timeout_active) {
5121                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5122                                 goto reconnect;
5123                         }
5124                         set_bit(SEND_PING, &tconn->flags);
5125                         continue;
5126                 } else if (rv == -EINTR) {
5127                         continue;
5128                 } else {
5129                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5130                         goto reconnect;
5131                 }
5132
5133                 if (received == expect && cmd == NULL) {
5134                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5135                                 goto reconnect;
5136                         cmd = &asender_tbl[pi.cmd];
5137                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5138                                 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
5139                                         pi.cmd, pi.size);
5140                                 goto disconnect;
5141                         }
5142                         expect = header_size + cmd->pkt_size;
5143                         if (pi.size != expect - header_size) {
5144                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5145                                         pi.cmd, pi.size);
5146                                 goto reconnect;
5147                         }
5148                 }
5149                 if (received == expect) {
5150                         bool err;
5151
5152                         err = cmd->fn(tconn, &pi);
5153                         if (err) {
5154                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5155                                 goto reconnect;
5156                         }
5157
5158                         tconn->last_received = jiffies;
5159
5160                         if (cmd == &asender_tbl[P_PING_ACK]) {
5161                                 /* restore idle timeout */
5162                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5163                                 ping_timeout_active = false;
5164                         }
5165
5166                         buf      = tconn->meta.rbuf;
5167                         received = 0;
5168                         expect   = header_size;
5169                         cmd      = NULL;
5170                 }
5171         }
5172
5173         if (0) {
5174 reconnect:
5175                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5176         }
5177         if (0) {
5178 disconnect:
5179                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5180         }
5181         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5182
5183         conn_info(tconn, "asender terminated\n");
5184
5185         return 0;
5186 }