]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
drbd: receive_protocol(): Make the program flow less confusing
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490
491 out:
492         return err;
493 }
494
495 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
496 {
497         mm_segment_t oldfs;
498         struct kvec iov = {
499                 .iov_base = buf,
500                 .iov_len = size,
501         };
502         struct msghdr msg = {
503                 .msg_iovlen = 1,
504                 .msg_iov = (struct iovec *)&iov,
505                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506         };
507         int rv;
508
509         oldfs = get_fs();
510         set_fs(KERNEL_DS);
511         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512         set_fs(oldfs);
513
514         return rv;
515 }
516
517 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
518 {
519         mm_segment_t oldfs;
520         struct kvec iov = {
521                 .iov_base = buf,
522                 .iov_len = size,
523         };
524         struct msghdr msg = {
525                 .msg_iovlen = 1,
526                 .msg_iov = (struct iovec *)&iov,
527                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528         };
529         int rv;
530
531         oldfs = get_fs();
532         set_fs(KERNEL_DS);
533
534         for (;;) {
535                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
536                 if (rv == size)
537                         break;
538
539                 /* Note:
540                  * ECONNRESET   other side closed the connection
541                  * ERESTARTSYS  (on  sock) we got a signal
542                  */
543
544                 if (rv < 0) {
545                         if (rv == -ECONNRESET)
546                                 conn_info(tconn, "sock was reset by peer\n");
547                         else if (rv != -ERESTARTSYS)
548                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
549                         break;
550                 } else if (rv == 0) {
551                         conn_info(tconn, "sock was shut down by peer\n");
552                         break;
553                 } else  {
554                         /* signal came in, or peer/link went down,
555                          * after we read a partial message
556                          */
557                         /* D_ASSERT(signal_pending(current)); */
558                         break;
559                 }
560         };
561
562         set_fs(oldfs);
563
564         if (rv != size)
565                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
566
567         return rv;
568 }
569
570 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
571 {
572         int err;
573
574         err = drbd_recv(tconn, buf, size);
575         if (err != size) {
576                 if (err >= 0)
577                         err = -EIO;
578         } else
579                 err = 0;
580         return err;
581 }
582
583 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
584 {
585         int err;
586
587         err = drbd_recv_all(tconn, buf, size);
588         if (err && !signal_pending(current))
589                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
590         return err;
591 }
592
593 /* quoting tcp(7):
594  *   On individual connections, the socket buffer size must be set prior to the
595  *   listen(2) or connect(2) calls in order to have it take effect.
596  * This is our wrapper to do so.
597  */
598 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599                 unsigned int rcv)
600 {
601         /* open coded SO_SNDBUF, SO_RCVBUF */
602         if (snd) {
603                 sock->sk->sk_sndbuf = snd;
604                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605         }
606         if (rcv) {
607                 sock->sk->sk_rcvbuf = rcv;
608                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
609         }
610 }
611
612 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
613 {
614         const char *what;
615         struct socket *sock;
616         struct sockaddr_in6 src_in6;
617         struct sockaddr_in6 peer_in6;
618         struct net_conf *nc;
619         int err, peer_addr_len, my_addr_len;
620         int sndbuf_size, rcvbuf_size, connect_int;
621         int disconnect_on_error = 1;
622
623         rcu_read_lock();
624         nc = rcu_dereference(tconn->net_conf);
625         if (!nc) {
626                 rcu_read_unlock();
627                 return NULL;
628         }
629         sndbuf_size = nc->sndbuf_size;
630         rcvbuf_size = nc->rcvbuf_size;
631         connect_int = nc->connect_int;
632         rcu_read_unlock();
633
634         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
635         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
636
637         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
638                 src_in6.sin6_port = 0;
639         else
640                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
641
642         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
643         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
644
645         what = "sock_create_kern";
646         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
647                                SOCK_STREAM, IPPROTO_TCP, &sock);
648         if (err < 0) {
649                 sock = NULL;
650                 goto out;
651         }
652
653         sock->sk->sk_rcvtimeo =
654         sock->sk->sk_sndtimeo = connect_int * HZ;
655         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
656
657        /* explicitly bind to the configured IP as source IP
658         *  for the outgoing connections.
659         *  This is needed for multihomed hosts and to be
660         *  able to use lo: interfaces for drbd.
661         * Make sure to use 0 as port number, so linux selects
662         *  a free one dynamically.
663         */
664         what = "bind before connect";
665         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
666         if (err < 0)
667                 goto out;
668
669         /* connect may fail, peer not yet available.
670          * stay C_WF_CONNECTION, don't go Disconnecting! */
671         disconnect_on_error = 0;
672         what = "connect";
673         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
674
675 out:
676         if (err < 0) {
677                 if (sock) {
678                         sock_release(sock);
679                         sock = NULL;
680                 }
681                 switch (-err) {
682                         /* timeout, busy, signal pending */
683                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
684                 case EINTR: case ERESTARTSYS:
685                         /* peer not (yet) available, network problem */
686                 case ECONNREFUSED: case ENETUNREACH:
687                 case EHOSTDOWN:    case EHOSTUNREACH:
688                         disconnect_on_error = 0;
689                         break;
690                 default:
691                         conn_err(tconn, "%s failed, err = %d\n", what, err);
692                 }
693                 if (disconnect_on_error)
694                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
695         }
696
697         return sock;
698 }
699
700 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
701 {
702         int timeo, err, my_addr_len;
703         int sndbuf_size, rcvbuf_size, connect_int;
704         struct socket *s_estab = NULL, *s_listen;
705         struct sockaddr_in6 my_addr;
706         struct net_conf *nc;
707         const char *what;
708
709         rcu_read_lock();
710         nc = rcu_dereference(tconn->net_conf);
711         if (!nc) {
712                 rcu_read_unlock();
713                 return NULL;
714         }
715         sndbuf_size = nc->sndbuf_size;
716         rcvbuf_size = nc->rcvbuf_size;
717         connect_int = nc->connect_int;
718         rcu_read_unlock();
719
720         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
721         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
722
723         what = "sock_create_kern";
724         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
725                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
726         if (err) {
727                 s_listen = NULL;
728                 goto out;
729         }
730
731         timeo = connect_int * HZ;
732         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
733
734         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
735         s_listen->sk->sk_rcvtimeo = timeo;
736         s_listen->sk->sk_sndtimeo = timeo;
737         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
738
739         what = "bind before listen";
740         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
741         if (err < 0)
742                 goto out;
743
744         err = drbd_accept(&what, s_listen, &s_estab);
745
746 out:
747         if (s_listen)
748                 sock_release(s_listen);
749         if (err < 0) {
750                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
751                         conn_err(tconn, "%s failed, err = %d\n", what, err);
752                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
753                 }
754         }
755
756         return s_estab;
757 }
758
759 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
760
761 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
762                              enum drbd_packet cmd)
763 {
764         if (!conn_prepare_command(tconn, sock))
765                 return -EIO;
766         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
767 }
768
769 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
770 {
771         unsigned int header_size = drbd_header_size(tconn);
772         struct packet_info pi;
773         int err;
774
775         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
776         if (err != header_size) {
777                 if (err >= 0)
778                         err = -EIO;
779                 return err;
780         }
781         err = decode_header(tconn, tconn->data.rbuf, &pi);
782         if (err)
783                 return err;
784         return pi.cmd;
785 }
786
787 /**
788  * drbd_socket_okay() - Free the socket if its connection is not okay
789  * @sock:       pointer to the pointer to the socket.
790  */
791 static int drbd_socket_okay(struct socket **sock)
792 {
793         int rr;
794         char tb[4];
795
796         if (!*sock)
797                 return false;
798
799         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
800
801         if (rr > 0 || rr == -EAGAIN) {
802                 return true;
803         } else {
804                 sock_release(*sock);
805                 *sock = NULL;
806                 return false;
807         }
808 }
809 /* Gets called if a connection is established, or if a new minor gets created
810    in a connection */
811 int drbd_connected(struct drbd_conf *mdev)
812 {
813         int err;
814
815         atomic_set(&mdev->packet_seq, 0);
816         mdev->peer_seq = 0;
817
818         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
819                 &mdev->tconn->cstate_mutex :
820                 &mdev->own_state_mutex;
821
822         err = drbd_send_sync_param(mdev);
823         if (!err)
824                 err = drbd_send_sizes(mdev, 0, 0);
825         if (!err)
826                 err = drbd_send_uuids(mdev);
827         if (!err)
828                 err = drbd_send_state(mdev);
829         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
830         clear_bit(RESIZE_PENDING, &mdev->flags);
831         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
832         return err;
833 }
834
835 /*
836  * return values:
837  *   1 yes, we have a valid connection
838  *   0 oops, did not work out, please try again
839  *  -1 peer talks different language,
840  *     no point in trying again, please go standalone.
841  *  -2 We do not have a network config...
842  */
843 static int conn_connect(struct drbd_tconn *tconn)
844 {
845         struct socket *sock, *msock;
846         struct drbd_conf *mdev;
847         struct net_conf *nc;
848         int vnr, timeout, try, h, ok;
849
850         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
851                 return -2;
852
853         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
854
855         /* Assume that the peer only understands protocol 80 until we know better.  */
856         tconn->agreed_pro_version = 80;
857
858         do {
859                 struct socket *s;
860
861                 for (try = 0;;) {
862                         /* 3 tries, this should take less than a second! */
863                         s = drbd_try_connect(tconn);
864                         if (s || ++try >= 3)
865                                 break;
866                         /* give the other side time to call bind() & listen() */
867                         schedule_timeout_interruptible(HZ / 10);
868                 }
869
870                 if (s) {
871                         if (!tconn->data.socket) {
872                                 tconn->data.socket = s;
873                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
874                         } else if (!tconn->meta.socket) {
875                                 tconn->meta.socket = s;
876                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
877                         } else {
878                                 conn_err(tconn, "Logic error in conn_connect()\n");
879                                 goto out_release_sockets;
880                         }
881                 }
882
883                 if (tconn->data.socket && tconn->meta.socket) {
884                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
885                         ok = drbd_socket_okay(&tconn->data.socket);
886                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
887                         if (ok)
888                                 break;
889                 }
890
891 retry:
892                 s = drbd_wait_for_connect(tconn);
893                 if (s) {
894                         try = receive_first_packet(tconn, s);
895                         drbd_socket_okay(&tconn->data.socket);
896                         drbd_socket_okay(&tconn->meta.socket);
897                         switch (try) {
898                         case P_INITIAL_DATA:
899                                 if (tconn->data.socket) {
900                                         conn_warn(tconn, "initial packet S crossed\n");
901                                         sock_release(tconn->data.socket);
902                                 }
903                                 tconn->data.socket = s;
904                                 break;
905                         case P_INITIAL_META:
906                                 if (tconn->meta.socket) {
907                                         conn_warn(tconn, "initial packet M crossed\n");
908                                         sock_release(tconn->meta.socket);
909                                 }
910                                 tconn->meta.socket = s;
911                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
912                                 break;
913                         default:
914                                 conn_warn(tconn, "Error receiving initial packet\n");
915                                 sock_release(s);
916                                 if (random32() & 1)
917                                         goto retry;
918                         }
919                 }
920
921                 if (tconn->cstate <= C_DISCONNECTING)
922                         goto out_release_sockets;
923                 if (signal_pending(current)) {
924                         flush_signals(current);
925                         smp_rmb();
926                         if (get_t_state(&tconn->receiver) == EXITING)
927                                 goto out_release_sockets;
928                 }
929
930                 if (tconn->data.socket && &tconn->meta.socket) {
931                         ok = drbd_socket_okay(&tconn->data.socket);
932                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
933                         if (ok)
934                                 break;
935                 }
936         } while (1);
937
938         sock  = tconn->data.socket;
939         msock = tconn->meta.socket;
940
941         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
942         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
943
944         sock->sk->sk_allocation = GFP_NOIO;
945         msock->sk->sk_allocation = GFP_NOIO;
946
947         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
948         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
949
950         /* NOT YET ...
951          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
952          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
953          * first set it to the P_CONNECTION_FEATURES timeout,
954          * which we set to 4x the configured ping_timeout. */
955         rcu_read_lock();
956         nc = rcu_dereference(tconn->net_conf);
957
958         sock->sk->sk_sndtimeo =
959         sock->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
960
961         msock->sk->sk_rcvtimeo = nc->ping_int*HZ;
962         timeout = nc->timeout * HZ / 10;
963         rcu_read_unlock();
964
965         msock->sk->sk_sndtimeo = timeout;
966
967         /* we don't want delays.
968          * we use TCP_CORK where appropriate, though */
969         drbd_tcp_nodelay(sock);
970         drbd_tcp_nodelay(msock);
971
972         tconn->last_received = jiffies;
973
974         h = drbd_do_features(tconn);
975         if (h <= 0)
976                 return h;
977
978         if (tconn->cram_hmac_tfm) {
979                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
980                 switch (drbd_do_auth(tconn)) {
981                 case -1:
982                         conn_err(tconn, "Authentication of peer failed\n");
983                         return -1;
984                 case 0:
985                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
986                         return 0;
987                 }
988         }
989
990         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
991                 return 0;
992
993         sock->sk->sk_sndtimeo = timeout;
994         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
995
996         drbd_thread_start(&tconn->asender);
997
998         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
999                 return -1;
1000
1001         rcu_read_lock();
1002         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1003                 kref_get(&mdev->kref);
1004                 rcu_read_unlock();
1005                 drbd_connected(mdev);
1006                 kref_put(&mdev->kref, &drbd_minor_destroy);
1007                 rcu_read_lock();
1008         }
1009         rcu_read_unlock();
1010
1011         return h;
1012
1013 out_release_sockets:
1014         if (tconn->data.socket) {
1015                 sock_release(tconn->data.socket);
1016                 tconn->data.socket = NULL;
1017         }
1018         if (tconn->meta.socket) {
1019                 sock_release(tconn->meta.socket);
1020                 tconn->meta.socket = NULL;
1021         }
1022         return -1;
1023 }
1024
1025 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1026 {
1027         unsigned int header_size = drbd_header_size(tconn);
1028
1029         if (header_size == sizeof(struct p_header100) &&
1030             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1031                 struct p_header100 *h = header;
1032                 if (h->pad != 0) {
1033                         conn_err(tconn, "Header padding is not zero\n");
1034                         return -EINVAL;
1035                 }
1036                 pi->vnr = be16_to_cpu(h->volume);
1037                 pi->cmd = be16_to_cpu(h->command);
1038                 pi->size = be32_to_cpu(h->length);
1039         } else if (header_size == sizeof(struct p_header95) &&
1040                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1041                 struct p_header95 *h = header;
1042                 pi->cmd = be16_to_cpu(h->command);
1043                 pi->size = be32_to_cpu(h->length);
1044                 pi->vnr = 0;
1045         } else if (header_size == sizeof(struct p_header80) &&
1046                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1047                 struct p_header80 *h = header;
1048                 pi->cmd = be16_to_cpu(h->command);
1049                 pi->size = be16_to_cpu(h->length);
1050                 pi->vnr = 0;
1051         } else {
1052                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1053                          be32_to_cpu(*(__be32 *)header),
1054                          tconn->agreed_pro_version);
1055                 return -EINVAL;
1056         }
1057         pi->data = header + header_size;
1058         return 0;
1059 }
1060
1061 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1062 {
1063         void *buffer = tconn->data.rbuf;
1064         int err;
1065
1066         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1067         if (err)
1068                 return err;
1069
1070         err = decode_header(tconn, buffer, pi);
1071         tconn->last_received = jiffies;
1072
1073         return err;
1074 }
1075
1076 static void drbd_flush(struct drbd_conf *mdev)
1077 {
1078         int rv;
1079
1080         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1081                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1082                                         NULL);
1083                 if (rv) {
1084                         dev_info(DEV, "local disk flush failed with status %d\n", rv);
1085                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1086                          * don't try again for ANY return value != 0
1087                          * if (rv == -EOPNOTSUPP) */
1088                         drbd_bump_write_ordering(mdev, WO_drain_io);
1089                 }
1090                 put_ldev(mdev);
1091         }
1092 }
1093
1094 /**
1095  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1096  * @mdev:       DRBD device.
1097  * @epoch:      Epoch object.
1098  * @ev:         Epoch event.
1099  */
1100 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1101                                                struct drbd_epoch *epoch,
1102                                                enum epoch_event ev)
1103 {
1104         int epoch_size;
1105         struct drbd_epoch *next_epoch;
1106         enum finish_epoch rv = FE_STILL_LIVE;
1107
1108         spin_lock(&mdev->epoch_lock);
1109         do {
1110                 next_epoch = NULL;
1111
1112                 epoch_size = atomic_read(&epoch->epoch_size);
1113
1114                 switch (ev & ~EV_CLEANUP) {
1115                 case EV_PUT:
1116                         atomic_dec(&epoch->active);
1117                         break;
1118                 case EV_GOT_BARRIER_NR:
1119                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1120                         break;
1121                 case EV_BECAME_LAST:
1122                         /* nothing to do*/
1123                         break;
1124                 }
1125
1126                 if (epoch_size != 0 &&
1127                     atomic_read(&epoch->active) == 0 &&
1128                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1129                         if (!(ev & EV_CLEANUP)) {
1130                                 spin_unlock(&mdev->epoch_lock);
1131                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1132                                 spin_lock(&mdev->epoch_lock);
1133                         }
1134                         dec_unacked(mdev);
1135
1136                         if (mdev->current_epoch != epoch) {
1137                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1138                                 list_del(&epoch->list);
1139                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1140                                 mdev->epochs--;
1141                                 kfree(epoch);
1142
1143                                 if (rv == FE_STILL_LIVE)
1144                                         rv = FE_DESTROYED;
1145                         } else {
1146                                 epoch->flags = 0;
1147                                 atomic_set(&epoch->epoch_size, 0);
1148                                 /* atomic_set(&epoch->active, 0); is already zero */
1149                                 if (rv == FE_STILL_LIVE)
1150                                         rv = FE_RECYCLED;
1151                                 wake_up(&mdev->ee_wait);
1152                         }
1153                 }
1154
1155                 if (!next_epoch)
1156                         break;
1157
1158                 epoch = next_epoch;
1159         } while (1);
1160
1161         spin_unlock(&mdev->epoch_lock);
1162
1163         return rv;
1164 }
1165
1166 /**
1167  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1168  * @mdev:       DRBD device.
1169  * @wo:         Write ordering method to try.
1170  */
1171 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1172 {
1173         struct disk_conf *dc;
1174         enum write_ordering_e pwo;
1175         static char *write_ordering_str[] = {
1176                 [WO_none] = "none",
1177                 [WO_drain_io] = "drain",
1178                 [WO_bdev_flush] = "flush",
1179         };
1180
1181         pwo = mdev->write_ordering;
1182         wo = min(pwo, wo);
1183         rcu_read_lock();
1184         dc = rcu_dereference(mdev->ldev->disk_conf);
1185
1186         if (wo == WO_bdev_flush && !dc->disk_flushes)
1187                 wo = WO_drain_io;
1188         if (wo == WO_drain_io && !dc->disk_drain)
1189                 wo = WO_none;
1190         rcu_read_unlock();
1191         mdev->write_ordering = wo;
1192         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1193                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1194 }
1195
1196 /**
1197  * drbd_submit_peer_request()
1198  * @mdev:       DRBD device.
1199  * @peer_req:   peer request
1200  * @rw:         flag field, see bio->bi_rw
1201  *
1202  * May spread the pages to multiple bios,
1203  * depending on bio_add_page restrictions.
1204  *
1205  * Returns 0 if all bios have been submitted,
1206  * -ENOMEM if we could not allocate enough bios,
1207  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1208  *  single page to an empty bio (which should never happen and likely indicates
1209  *  that the lower level IO stack is in some way broken). This has been observed
1210  *  on certain Xen deployments.
1211  */
1212 /* TODO allocate from our own bio_set. */
1213 int drbd_submit_peer_request(struct drbd_conf *mdev,
1214                              struct drbd_peer_request *peer_req,
1215                              const unsigned rw, const int fault_type)
1216 {
1217         struct bio *bios = NULL;
1218         struct bio *bio;
1219         struct page *page = peer_req->pages;
1220         sector_t sector = peer_req->i.sector;
1221         unsigned ds = peer_req->i.size;
1222         unsigned n_bios = 0;
1223         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1224         int err = -ENOMEM;
1225
1226         /* In most cases, we will only need one bio.  But in case the lower
1227          * level restrictions happen to be different at this offset on this
1228          * side than those of the sending peer, we may need to submit the
1229          * request in more than one bio.
1230          *
1231          * Plain bio_alloc is good enough here, this is no DRBD internally
1232          * generated bio, but a bio allocated on behalf of the peer.
1233          */
1234 next_bio:
1235         bio = bio_alloc(GFP_NOIO, nr_pages);
1236         if (!bio) {
1237                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1238                 goto fail;
1239         }
1240         /* > peer_req->i.sector, unless this is the first bio */
1241         bio->bi_sector = sector;
1242         bio->bi_bdev = mdev->ldev->backing_bdev;
1243         bio->bi_rw = rw;
1244         bio->bi_private = peer_req;
1245         bio->bi_end_io = drbd_peer_request_endio;
1246
1247         bio->bi_next = bios;
1248         bios = bio;
1249         ++n_bios;
1250
1251         page_chain_for_each(page) {
1252                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1253                 if (!bio_add_page(bio, page, len, 0)) {
1254                         /* A single page must always be possible!
1255                          * But in case it fails anyways,
1256                          * we deal with it, and complain (below). */
1257                         if (bio->bi_vcnt == 0) {
1258                                 dev_err(DEV,
1259                                         "bio_add_page failed for len=%u, "
1260                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1261                                         len, (unsigned long long)bio->bi_sector);
1262                                 err = -ENOSPC;
1263                                 goto fail;
1264                         }
1265                         goto next_bio;
1266                 }
1267                 ds -= len;
1268                 sector += len >> 9;
1269                 --nr_pages;
1270         }
1271         D_ASSERT(page == NULL);
1272         D_ASSERT(ds == 0);
1273
1274         atomic_set(&peer_req->pending_bios, n_bios);
1275         do {
1276                 bio = bios;
1277                 bios = bios->bi_next;
1278                 bio->bi_next = NULL;
1279
1280                 drbd_generic_make_request(mdev, fault_type, bio);
1281         } while (bios);
1282         return 0;
1283
1284 fail:
1285         while (bios) {
1286                 bio = bios;
1287                 bios = bios->bi_next;
1288                 bio_put(bio);
1289         }
1290         return err;
1291 }
1292
1293 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1294                                              struct drbd_peer_request *peer_req)
1295 {
1296         struct drbd_interval *i = &peer_req->i;
1297
1298         drbd_remove_interval(&mdev->write_requests, i);
1299         drbd_clear_interval(i);
1300
1301         /* Wake up any processes waiting for this peer request to complete.  */
1302         if (i->waiting)
1303                 wake_up(&mdev->misc_wait);
1304 }
1305
1306 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1307 {
1308         struct drbd_conf *mdev;
1309         int rv;
1310         struct p_barrier *p = pi->data;
1311         struct drbd_epoch *epoch;
1312
1313         mdev = vnr_to_mdev(tconn, pi->vnr);
1314         if (!mdev)
1315                 return -EIO;
1316
1317         inc_unacked(mdev);
1318
1319         mdev->current_epoch->barrier_nr = p->barrier;
1320         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1321
1322         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1323          * the activity log, which means it would not be resynced in case the
1324          * R_PRIMARY crashes now.
1325          * Therefore we must send the barrier_ack after the barrier request was
1326          * completed. */
1327         switch (mdev->write_ordering) {
1328         case WO_none:
1329                 if (rv == FE_RECYCLED)
1330                         return 0;
1331
1332                 /* receiver context, in the writeout path of the other node.
1333                  * avoid potential distributed deadlock */
1334                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1335                 if (epoch)
1336                         break;
1337                 else
1338                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1339                         /* Fall through */
1340
1341         case WO_bdev_flush:
1342         case WO_drain_io:
1343                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1344                 drbd_flush(mdev);
1345
1346                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1347                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1348                         if (epoch)
1349                                 break;
1350                 }
1351
1352                 epoch = mdev->current_epoch;
1353                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1354
1355                 D_ASSERT(atomic_read(&epoch->active) == 0);
1356                 D_ASSERT(epoch->flags == 0);
1357
1358                 return 0;
1359         default:
1360                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1361                 return -EIO;
1362         }
1363
1364         epoch->flags = 0;
1365         atomic_set(&epoch->epoch_size, 0);
1366         atomic_set(&epoch->active, 0);
1367
1368         spin_lock(&mdev->epoch_lock);
1369         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1370                 list_add(&epoch->list, &mdev->current_epoch->list);
1371                 mdev->current_epoch = epoch;
1372                 mdev->epochs++;
1373         } else {
1374                 /* The current_epoch got recycled while we allocated this one... */
1375                 kfree(epoch);
1376         }
1377         spin_unlock(&mdev->epoch_lock);
1378
1379         return 0;
1380 }
1381
1382 /* used from receive_RSDataReply (recv_resync_read)
1383  * and from receive_Data */
1384 static struct drbd_peer_request *
1385 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1386               int data_size) __must_hold(local)
1387 {
1388         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1389         struct drbd_peer_request *peer_req;
1390         struct page *page;
1391         int dgs, ds, err;
1392         void *dig_in = mdev->tconn->int_dig_in;
1393         void *dig_vv = mdev->tconn->int_dig_vv;
1394         unsigned long *data;
1395
1396         dgs = 0;
1397         if (mdev->tconn->peer_integrity_tfm) {
1398                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1399                 /*
1400                  * FIXME: Receive the incoming digest into the receive buffer
1401                  *        here, together with its struct p_data?
1402                  */
1403                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1404                 if (err)
1405                         return NULL;
1406                 data_size -= dgs;
1407         }
1408
1409         if (!expect(data_size != 0))
1410                 return NULL;
1411         if (!expect(IS_ALIGNED(data_size, 512)))
1412                 return NULL;
1413         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1414                 return NULL;
1415
1416         /* even though we trust out peer,
1417          * we sometimes have to double check. */
1418         if (sector + (data_size>>9) > capacity) {
1419                 dev_err(DEV, "request from peer beyond end of local disk: "
1420                         "capacity: %llus < sector: %llus + size: %u\n",
1421                         (unsigned long long)capacity,
1422                         (unsigned long long)sector, data_size);
1423                 return NULL;
1424         }
1425
1426         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1427          * "criss-cross" setup, that might cause write-out on some other DRBD,
1428          * which in turn might block on the other node at this very place.  */
1429         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1430         if (!peer_req)
1431                 return NULL;
1432
1433         ds = data_size;
1434         page = peer_req->pages;
1435         page_chain_for_each(page) {
1436                 unsigned len = min_t(int, ds, PAGE_SIZE);
1437                 data = kmap(page);
1438                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1439                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1440                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1441                         data[0] = data[0] ^ (unsigned long)-1;
1442                 }
1443                 kunmap(page);
1444                 if (err) {
1445                         drbd_free_peer_req(mdev, peer_req);
1446                         return NULL;
1447                 }
1448                 ds -= len;
1449         }
1450
1451         if (dgs) {
1452                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1453                 if (memcmp(dig_in, dig_vv, dgs)) {
1454                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1455                                 (unsigned long long)sector, data_size);
1456                         drbd_free_peer_req(mdev, peer_req);
1457                         return NULL;
1458                 }
1459         }
1460         mdev->recv_cnt += data_size>>9;
1461         return peer_req;
1462 }
1463
1464 /* drbd_drain_block() just takes a data block
1465  * out of the socket input buffer, and discards it.
1466  */
1467 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1468 {
1469         struct page *page;
1470         int err = 0;
1471         void *data;
1472
1473         if (!data_size)
1474                 return 0;
1475
1476         page = drbd_alloc_pages(mdev, 1, 1);
1477
1478         data = kmap(page);
1479         while (data_size) {
1480                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1481
1482                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1483                 if (err)
1484                         break;
1485                 data_size -= len;
1486         }
1487         kunmap(page);
1488         drbd_free_pages(mdev, page, 0);
1489         return err;
1490 }
1491
1492 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1493                            sector_t sector, int data_size)
1494 {
1495         struct bio_vec *bvec;
1496         struct bio *bio;
1497         int dgs, err, i, expect;
1498         void *dig_in = mdev->tconn->int_dig_in;
1499         void *dig_vv = mdev->tconn->int_dig_vv;
1500
1501         dgs = 0;
1502         if (mdev->tconn->peer_integrity_tfm) {
1503                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1504                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1505                 if (err)
1506                         return err;
1507                 data_size -= dgs;
1508         }
1509
1510         /* optimistically update recv_cnt.  if receiving fails below,
1511          * we disconnect anyways, and counters will be reset. */
1512         mdev->recv_cnt += data_size>>9;
1513
1514         bio = req->master_bio;
1515         D_ASSERT(sector == bio->bi_sector);
1516
1517         bio_for_each_segment(bvec, bio, i) {
1518                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1519                 expect = min_t(int, data_size, bvec->bv_len);
1520                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1521                 kunmap(bvec->bv_page);
1522                 if (err)
1523                         return err;
1524                 data_size -= expect;
1525         }
1526
1527         if (dgs) {
1528                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1529                 if (memcmp(dig_in, dig_vv, dgs)) {
1530                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1531                         return -EINVAL;
1532                 }
1533         }
1534
1535         D_ASSERT(data_size == 0);
1536         return 0;
1537 }
1538
1539 /*
1540  * e_end_resync_block() is called in asender context via
1541  * drbd_finish_peer_reqs().
1542  */
1543 static int e_end_resync_block(struct drbd_work *w, int unused)
1544 {
1545         struct drbd_peer_request *peer_req =
1546                 container_of(w, struct drbd_peer_request, w);
1547         struct drbd_conf *mdev = w->mdev;
1548         sector_t sector = peer_req->i.sector;
1549         int err;
1550
1551         D_ASSERT(drbd_interval_empty(&peer_req->i));
1552
1553         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1554                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1555                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1556         } else {
1557                 /* Record failure to sync */
1558                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1559
1560                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1561         }
1562         dec_unacked(mdev);
1563
1564         return err;
1565 }
1566
1567 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1568 {
1569         struct drbd_peer_request *peer_req;
1570
1571         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1572         if (!peer_req)
1573                 goto fail;
1574
1575         dec_rs_pending(mdev);
1576
1577         inc_unacked(mdev);
1578         /* corresponding dec_unacked() in e_end_resync_block()
1579          * respective _drbd_clear_done_ee */
1580
1581         peer_req->w.cb = e_end_resync_block;
1582
1583         spin_lock_irq(&mdev->tconn->req_lock);
1584         list_add(&peer_req->w.list, &mdev->sync_ee);
1585         spin_unlock_irq(&mdev->tconn->req_lock);
1586
1587         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1588         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1589                 return 0;
1590
1591         /* don't care for the reason here */
1592         dev_err(DEV, "submit failed, triggering re-connect\n");
1593         spin_lock_irq(&mdev->tconn->req_lock);
1594         list_del(&peer_req->w.list);
1595         spin_unlock_irq(&mdev->tconn->req_lock);
1596
1597         drbd_free_peer_req(mdev, peer_req);
1598 fail:
1599         put_ldev(mdev);
1600         return -EIO;
1601 }
1602
1603 static struct drbd_request *
1604 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1605              sector_t sector, bool missing_ok, const char *func)
1606 {
1607         struct drbd_request *req;
1608
1609         /* Request object according to our peer */
1610         req = (struct drbd_request *)(unsigned long)id;
1611         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1612                 return req;
1613         if (!missing_ok) {
1614                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1615                         (unsigned long)id, (unsigned long long)sector);
1616         }
1617         return NULL;
1618 }
1619
1620 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1621 {
1622         struct drbd_conf *mdev;
1623         struct drbd_request *req;
1624         sector_t sector;
1625         int err;
1626         struct p_data *p = pi->data;
1627
1628         mdev = vnr_to_mdev(tconn, pi->vnr);
1629         if (!mdev)
1630                 return -EIO;
1631
1632         sector = be64_to_cpu(p->sector);
1633
1634         spin_lock_irq(&mdev->tconn->req_lock);
1635         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1636         spin_unlock_irq(&mdev->tconn->req_lock);
1637         if (unlikely(!req))
1638                 return -EIO;
1639
1640         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1641          * special casing it there for the various failure cases.
1642          * still no race with drbd_fail_pending_reads */
1643         err = recv_dless_read(mdev, req, sector, pi->size);
1644         if (!err)
1645                 req_mod(req, DATA_RECEIVED);
1646         /* else: nothing. handled from drbd_disconnect...
1647          * I don't think we may complete this just yet
1648          * in case we are "on-disconnect: freeze" */
1649
1650         return err;
1651 }
1652
1653 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1654 {
1655         struct drbd_conf *mdev;
1656         sector_t sector;
1657         int err;
1658         struct p_data *p = pi->data;
1659
1660         mdev = vnr_to_mdev(tconn, pi->vnr);
1661         if (!mdev)
1662                 return -EIO;
1663
1664         sector = be64_to_cpu(p->sector);
1665         D_ASSERT(p->block_id == ID_SYNCER);
1666
1667         if (get_ldev(mdev)) {
1668                 /* data is submitted to disk within recv_resync_read.
1669                  * corresponding put_ldev done below on error,
1670                  * or in drbd_peer_request_endio. */
1671                 err = recv_resync_read(mdev, sector, pi->size);
1672         } else {
1673                 if (__ratelimit(&drbd_ratelimit_state))
1674                         dev_err(DEV, "Can not write resync data to local disk.\n");
1675
1676                 err = drbd_drain_block(mdev, pi->size);
1677
1678                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1679         }
1680
1681         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1682
1683         return err;
1684 }
1685
1686 static int w_restart_write(struct drbd_work *w, int cancel)
1687 {
1688         struct drbd_request *req = container_of(w, struct drbd_request, w);
1689         struct drbd_conf *mdev = w->mdev;
1690         struct bio *bio;
1691         unsigned long start_time;
1692         unsigned long flags;
1693
1694         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1695         if (!expect(req->rq_state & RQ_POSTPONED)) {
1696                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1697                 return -EIO;
1698         }
1699         bio = req->master_bio;
1700         start_time = req->start_time;
1701         /* Postponed requests will not have their master_bio completed!  */
1702         __req_mod(req, DISCARD_WRITE, NULL);
1703         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1704
1705         while (__drbd_make_request(mdev, bio, start_time))
1706                 /* retry */ ;
1707         return 0;
1708 }
1709
1710 static void restart_conflicting_writes(struct drbd_conf *mdev,
1711                                        sector_t sector, int size)
1712 {
1713         struct drbd_interval *i;
1714         struct drbd_request *req;
1715
1716         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1717                 if (!i->local)
1718                         continue;
1719                 req = container_of(i, struct drbd_request, i);
1720                 if (req->rq_state & RQ_LOCAL_PENDING ||
1721                     !(req->rq_state & RQ_POSTPONED))
1722                         continue;
1723                 if (expect(list_empty(&req->w.list))) {
1724                         req->w.mdev = mdev;
1725                         req->w.cb = w_restart_write;
1726                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1727                 }
1728         }
1729 }
1730
1731 /*
1732  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1733  */
1734 static int e_end_block(struct drbd_work *w, int cancel)
1735 {
1736         struct drbd_peer_request *peer_req =
1737                 container_of(w, struct drbd_peer_request, w);
1738         struct drbd_conf *mdev = w->mdev;
1739         sector_t sector = peer_req->i.sector;
1740         int err = 0, pcmd;
1741
1742         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1743                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1744                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1745                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1746                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1747                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1748                         err = drbd_send_ack(mdev, pcmd, peer_req);
1749                         if (pcmd == P_RS_WRITE_ACK)
1750                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1751                 } else {
1752                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1753                         /* we expect it to be marked out of sync anyways...
1754                          * maybe assert this?  */
1755                 }
1756                 dec_unacked(mdev);
1757         }
1758         /* we delete from the conflict detection hash _after_ we sent out the
1759          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1760         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1761                 spin_lock_irq(&mdev->tconn->req_lock);
1762                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1763                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1764                 if (peer_req->flags & EE_RESTART_REQUESTS)
1765                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1766                 spin_unlock_irq(&mdev->tconn->req_lock);
1767         } else
1768                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1769
1770         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1771
1772         return err;
1773 }
1774
1775 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1776 {
1777         struct drbd_conf *mdev = w->mdev;
1778         struct drbd_peer_request *peer_req =
1779                 container_of(w, struct drbd_peer_request, w);
1780         int err;
1781
1782         err = drbd_send_ack(mdev, ack, peer_req);
1783         dec_unacked(mdev);
1784
1785         return err;
1786 }
1787
1788 static int e_send_discard_write(struct drbd_work *w, int unused)
1789 {
1790         return e_send_ack(w, P_DISCARD_WRITE);
1791 }
1792
1793 static int e_send_retry_write(struct drbd_work *w, int unused)
1794 {
1795         struct drbd_tconn *tconn = w->mdev->tconn;
1796
1797         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1798                              P_RETRY_WRITE : P_DISCARD_WRITE);
1799 }
1800
1801 static bool seq_greater(u32 a, u32 b)
1802 {
1803         /*
1804          * We assume 32-bit wrap-around here.
1805          * For 24-bit wrap-around, we would have to shift:
1806          *  a <<= 8; b <<= 8;
1807          */
1808         return (s32)a - (s32)b > 0;
1809 }
1810
1811 static u32 seq_max(u32 a, u32 b)
1812 {
1813         return seq_greater(a, b) ? a : b;
1814 }
1815
1816 static bool need_peer_seq(struct drbd_conf *mdev)
1817 {
1818         struct drbd_tconn *tconn = mdev->tconn;
1819         int tp;
1820
1821         /*
1822          * We only need to keep track of the last packet_seq number of our peer
1823          * if we are in dual-primary mode and we have the discard flag set; see
1824          * handle_write_conflicts().
1825          */
1826
1827         rcu_read_lock();
1828         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1829         rcu_read_unlock();
1830
1831         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1832 }
1833
1834 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1835 {
1836         unsigned int newest_peer_seq;
1837
1838         if (need_peer_seq(mdev)) {
1839                 spin_lock(&mdev->peer_seq_lock);
1840                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1841                 mdev->peer_seq = newest_peer_seq;
1842                 spin_unlock(&mdev->peer_seq_lock);
1843                 /* wake up only if we actually changed mdev->peer_seq */
1844                 if (peer_seq == newest_peer_seq)
1845                         wake_up(&mdev->seq_wait);
1846         }
1847 }
1848
1849 /* Called from receive_Data.
1850  * Synchronize packets on sock with packets on msock.
1851  *
1852  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1853  * packet traveling on msock, they are still processed in the order they have
1854  * been sent.
1855  *
1856  * Note: we don't care for Ack packets overtaking P_DATA packets.
1857  *
1858  * In case packet_seq is larger than mdev->peer_seq number, there are
1859  * outstanding packets on the msock. We wait for them to arrive.
1860  * In case we are the logically next packet, we update mdev->peer_seq
1861  * ourselves. Correctly handles 32bit wrap around.
1862  *
1863  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1864  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1865  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1866  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1867  *
1868  * returns 0 if we may process the packet,
1869  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1870 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1871 {
1872         DEFINE_WAIT(wait);
1873         long timeout;
1874         int ret;
1875
1876         if (!need_peer_seq(mdev))
1877                 return 0;
1878
1879         spin_lock(&mdev->peer_seq_lock);
1880         for (;;) {
1881                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1882                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1883                         ret = 0;
1884                         break;
1885                 }
1886                 if (signal_pending(current)) {
1887                         ret = -ERESTARTSYS;
1888                         break;
1889                 }
1890                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1891                 spin_unlock(&mdev->peer_seq_lock);
1892                 rcu_read_lock();
1893                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1894                 rcu_read_unlock();
1895                 timeout = schedule_timeout(timeout);
1896                 spin_lock(&mdev->peer_seq_lock);
1897                 if (!timeout) {
1898                         ret = -ETIMEDOUT;
1899                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1900                         break;
1901                 }
1902         }
1903         spin_unlock(&mdev->peer_seq_lock);
1904         finish_wait(&mdev->seq_wait, &wait);
1905         return ret;
1906 }
1907
1908 /* see also bio_flags_to_wire()
1909  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1910  * flags and back. We may replicate to other kernel versions. */
1911 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1912 {
1913         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1914                 (dpf & DP_FUA ? REQ_FUA : 0) |
1915                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1916                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1917 }
1918
1919 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1920                                     unsigned int size)
1921 {
1922         struct drbd_interval *i;
1923
1924     repeat:
1925         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1926                 struct drbd_request *req;
1927                 struct bio_and_error m;
1928
1929                 if (!i->local)
1930                         continue;
1931                 req = container_of(i, struct drbd_request, i);
1932                 if (!(req->rq_state & RQ_POSTPONED))
1933                         continue;
1934                 req->rq_state &= ~RQ_POSTPONED;
1935                 __req_mod(req, NEG_ACKED, &m);
1936                 spin_unlock_irq(&mdev->tconn->req_lock);
1937                 if (m.bio)
1938                         complete_master_bio(mdev, &m);
1939                 spin_lock_irq(&mdev->tconn->req_lock);
1940                 goto repeat;
1941         }
1942 }
1943
1944 static int handle_write_conflicts(struct drbd_conf *mdev,
1945                                   struct drbd_peer_request *peer_req)
1946 {
1947         struct drbd_tconn *tconn = mdev->tconn;
1948         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1949         sector_t sector = peer_req->i.sector;
1950         const unsigned int size = peer_req->i.size;
1951         struct drbd_interval *i;
1952         bool equal;
1953         int err;
1954
1955         /*
1956          * Inserting the peer request into the write_requests tree will prevent
1957          * new conflicting local requests from being added.
1958          */
1959         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1960
1961     repeat:
1962         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1963                 if (i == &peer_req->i)
1964                         continue;
1965
1966                 if (!i->local) {
1967                         /*
1968                          * Our peer has sent a conflicting remote request; this
1969                          * should not happen in a two-node setup.  Wait for the
1970                          * earlier peer request to complete.
1971                          */
1972                         err = drbd_wait_misc(mdev, i);
1973                         if (err)
1974                                 goto out;
1975                         goto repeat;
1976                 }
1977
1978                 equal = i->sector == sector && i->size == size;
1979                 if (resolve_conflicts) {
1980                         /*
1981                          * If the peer request is fully contained within the
1982                          * overlapping request, it can be discarded; otherwise,
1983                          * it will be retried once all overlapping requests
1984                          * have completed.
1985                          */
1986                         bool discard = i->sector <= sector && i->sector +
1987                                        (i->size >> 9) >= sector + (size >> 9);
1988
1989                         if (!equal)
1990                                 dev_alert(DEV, "Concurrent writes detected: "
1991                                                "local=%llus +%u, remote=%llus +%u, "
1992                                                "assuming %s came first\n",
1993                                           (unsigned long long)i->sector, i->size,
1994                                           (unsigned long long)sector, size,
1995                                           discard ? "local" : "remote");
1996
1997                         inc_unacked(mdev);
1998                         peer_req->w.cb = discard ? e_send_discard_write :
1999                                                    e_send_retry_write;
2000                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2001                         wake_asender(mdev->tconn);
2002
2003                         err = -ENOENT;
2004                         goto out;
2005                 } else {
2006                         struct drbd_request *req =
2007                                 container_of(i, struct drbd_request, i);
2008
2009                         if (!equal)
2010                                 dev_alert(DEV, "Concurrent writes detected: "
2011                                                "local=%llus +%u, remote=%llus +%u\n",
2012                                           (unsigned long long)i->sector, i->size,
2013                                           (unsigned long long)sector, size);
2014
2015                         if (req->rq_state & RQ_LOCAL_PENDING ||
2016                             !(req->rq_state & RQ_POSTPONED)) {
2017                                 /*
2018                                  * Wait for the node with the discard flag to
2019                                  * decide if this request will be discarded or
2020                                  * retried.  Requests that are discarded will
2021                                  * disappear from the write_requests tree.
2022                                  *
2023                                  * In addition, wait for the conflicting
2024                                  * request to finish locally before submitting
2025                                  * the conflicting peer request.
2026                                  */
2027                                 err = drbd_wait_misc(mdev, &req->i);
2028                                 if (err) {
2029                                         _conn_request_state(mdev->tconn,
2030                                                             NS(conn, C_TIMEOUT),
2031                                                             CS_HARD);
2032                                         fail_postponed_requests(mdev, sector, size);
2033                                         goto out;
2034                                 }
2035                                 goto repeat;
2036                         }
2037                         /*
2038                          * Remember to restart the conflicting requests after
2039                          * the new peer request has completed.
2040                          */
2041                         peer_req->flags |= EE_RESTART_REQUESTS;
2042                 }
2043         }
2044         err = 0;
2045
2046     out:
2047         if (err)
2048                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2049         return err;
2050 }
2051
2052 /* mirrored write */
2053 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2054 {
2055         struct drbd_conf *mdev;
2056         sector_t sector;
2057         struct drbd_peer_request *peer_req;
2058         struct p_data *p = pi->data;
2059         u32 peer_seq = be32_to_cpu(p->seq_num);
2060         int rw = WRITE;
2061         u32 dp_flags;
2062         int err, tp;
2063
2064         mdev = vnr_to_mdev(tconn, pi->vnr);
2065         if (!mdev)
2066                 return -EIO;
2067
2068         if (!get_ldev(mdev)) {
2069                 int err2;
2070
2071                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2072                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2073                 atomic_inc(&mdev->current_epoch->epoch_size);
2074                 err2 = drbd_drain_block(mdev, pi->size);
2075                 if (!err)
2076                         err = err2;
2077                 return err;
2078         }
2079
2080         /*
2081          * Corresponding put_ldev done either below (on various errors), or in
2082          * drbd_peer_request_endio, if we successfully submit the data at the
2083          * end of this function.
2084          */
2085
2086         sector = be64_to_cpu(p->sector);
2087         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2088         if (!peer_req) {
2089                 put_ldev(mdev);
2090                 return -EIO;
2091         }
2092
2093         peer_req->w.cb = e_end_block;
2094
2095         dp_flags = be32_to_cpu(p->dp_flags);
2096         rw |= wire_flags_to_bio(mdev, dp_flags);
2097
2098         if (dp_flags & DP_MAY_SET_IN_SYNC)
2099                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2100
2101         spin_lock(&mdev->epoch_lock);
2102         peer_req->epoch = mdev->current_epoch;
2103         atomic_inc(&peer_req->epoch->epoch_size);
2104         atomic_inc(&peer_req->epoch->active);
2105         spin_unlock(&mdev->epoch_lock);
2106
2107         rcu_read_lock();
2108         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2109         rcu_read_unlock();
2110         if (tp) {
2111                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2112                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2113                 if (err)
2114                         goto out_interrupted;
2115                 spin_lock_irq(&mdev->tconn->req_lock);
2116                 err = handle_write_conflicts(mdev, peer_req);
2117                 if (err) {
2118                         spin_unlock_irq(&mdev->tconn->req_lock);
2119                         if (err == -ENOENT) {
2120                                 put_ldev(mdev);
2121                                 return 0;
2122                         }
2123                         goto out_interrupted;
2124                 }
2125         } else
2126                 spin_lock_irq(&mdev->tconn->req_lock);
2127         list_add(&peer_req->w.list, &mdev->active_ee);
2128         spin_unlock_irq(&mdev->tconn->req_lock);
2129
2130         if (mdev->tconn->agreed_pro_version < 100) {
2131                 rcu_read_lock();
2132                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2133                 case DRBD_PROT_C:
2134                         dp_flags |= DP_SEND_WRITE_ACK;
2135                         break;
2136                 case DRBD_PROT_B:
2137                         dp_flags |= DP_SEND_RECEIVE_ACK;
2138                         break;
2139                 }
2140                 rcu_read_unlock();
2141         }
2142
2143         if (dp_flags & DP_SEND_WRITE_ACK) {
2144                 peer_req->flags |= EE_SEND_WRITE_ACK;
2145                 inc_unacked(mdev);
2146                 /* corresponding dec_unacked() in e_end_block()
2147                  * respective _drbd_clear_done_ee */
2148         }
2149
2150         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2151                 /* I really don't like it that the receiver thread
2152                  * sends on the msock, but anyways */
2153                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2154         }
2155
2156         if (mdev->state.pdsk < D_INCONSISTENT) {
2157                 /* In case we have the only disk of the cluster, */
2158                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2159                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2160                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2161                 drbd_al_begin_io(mdev, &peer_req->i);
2162         }
2163
2164         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2165         if (!err)
2166                 return 0;
2167
2168         /* don't care for the reason here */
2169         dev_err(DEV, "submit failed, triggering re-connect\n");
2170         spin_lock_irq(&mdev->tconn->req_lock);
2171         list_del(&peer_req->w.list);
2172         drbd_remove_epoch_entry_interval(mdev, peer_req);
2173         spin_unlock_irq(&mdev->tconn->req_lock);
2174         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2175                 drbd_al_complete_io(mdev, &peer_req->i);
2176
2177 out_interrupted:
2178         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2179         put_ldev(mdev);
2180         drbd_free_peer_req(mdev, peer_req);
2181         return err;
2182 }
2183
2184 /* We may throttle resync, if the lower device seems to be busy,
2185  * and current sync rate is above c_min_rate.
2186  *
2187  * To decide whether or not the lower device is busy, we use a scheme similar
2188  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2189  * (more than 64 sectors) of activity we cannot account for with our own resync
2190  * activity, it obviously is "busy".
2191  *
2192  * The current sync rate used here uses only the most recent two step marks,
2193  * to have a short time average so we can react faster.
2194  */
2195 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2196 {
2197         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2198         unsigned long db, dt, dbdt;
2199         struct lc_element *tmp;
2200         int curr_events;
2201         int throttle = 0;
2202         unsigned int c_min_rate;
2203
2204         rcu_read_lock();
2205         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2206         rcu_read_unlock();
2207
2208         /* feature disabled? */
2209         if (c_min_rate == 0)
2210                 return 0;
2211
2212         spin_lock_irq(&mdev->al_lock);
2213         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2214         if (tmp) {
2215                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2216                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2217                         spin_unlock_irq(&mdev->al_lock);
2218                         return 0;
2219                 }
2220                 /* Do not slow down if app IO is already waiting for this extent */
2221         }
2222         spin_unlock_irq(&mdev->al_lock);
2223
2224         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2225                       (int)part_stat_read(&disk->part0, sectors[1]) -
2226                         atomic_read(&mdev->rs_sect_ev);
2227
2228         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2229                 unsigned long rs_left;
2230                 int i;
2231
2232                 mdev->rs_last_events = curr_events;
2233
2234                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2235                  * approx. */
2236                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2237
2238                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2239                         rs_left = mdev->ov_left;
2240                 else
2241                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2242
2243                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2244                 if (!dt)
2245                         dt++;
2246                 db = mdev->rs_mark_left[i] - rs_left;
2247                 dbdt = Bit2KB(db/dt);
2248
2249                 if (dbdt > c_min_rate)
2250                         throttle = 1;
2251         }
2252         return throttle;
2253 }
2254
2255
2256 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2257 {
2258         struct drbd_conf *mdev;
2259         sector_t sector;
2260         sector_t capacity;
2261         struct drbd_peer_request *peer_req;
2262         struct digest_info *di = NULL;
2263         int size, verb;
2264         unsigned int fault_type;
2265         struct p_block_req *p = pi->data;
2266
2267         mdev = vnr_to_mdev(tconn, pi->vnr);
2268         if (!mdev)
2269                 return -EIO;
2270         capacity = drbd_get_capacity(mdev->this_bdev);
2271
2272         sector = be64_to_cpu(p->sector);
2273         size   = be32_to_cpu(p->blksize);
2274
2275         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2276                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2277                                 (unsigned long long)sector, size);
2278                 return -EINVAL;
2279         }
2280         if (sector + (size>>9) > capacity) {
2281                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2282                                 (unsigned long long)sector, size);
2283                 return -EINVAL;
2284         }
2285
2286         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2287                 verb = 1;
2288                 switch (pi->cmd) {
2289                 case P_DATA_REQUEST:
2290                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2291                         break;
2292                 case P_RS_DATA_REQUEST:
2293                 case P_CSUM_RS_REQUEST:
2294                 case P_OV_REQUEST:
2295                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2296                         break;
2297                 case P_OV_REPLY:
2298                         verb = 0;
2299                         dec_rs_pending(mdev);
2300                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2301                         break;
2302                 default:
2303                         BUG();
2304                 }
2305                 if (verb && __ratelimit(&drbd_ratelimit_state))
2306                         dev_err(DEV, "Can not satisfy peer's read request, "
2307                             "no local data.\n");
2308
2309                 /* drain possibly payload */
2310                 return drbd_drain_block(mdev, pi->size);
2311         }
2312
2313         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2314          * "criss-cross" setup, that might cause write-out on some other DRBD,
2315          * which in turn might block on the other node at this very place.  */
2316         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2317         if (!peer_req) {
2318                 put_ldev(mdev);
2319                 return -ENOMEM;
2320         }
2321
2322         switch (pi->cmd) {
2323         case P_DATA_REQUEST:
2324                 peer_req->w.cb = w_e_end_data_req;
2325                 fault_type = DRBD_FAULT_DT_RD;
2326                 /* application IO, don't drbd_rs_begin_io */
2327                 goto submit;
2328
2329         case P_RS_DATA_REQUEST:
2330                 peer_req->w.cb = w_e_end_rsdata_req;
2331                 fault_type = DRBD_FAULT_RS_RD;
2332                 /* used in the sector offset progress display */
2333                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2334                 break;
2335
2336         case P_OV_REPLY:
2337         case P_CSUM_RS_REQUEST:
2338                 fault_type = DRBD_FAULT_RS_RD;
2339                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2340                 if (!di)
2341                         goto out_free_e;
2342
2343                 di->digest_size = pi->size;
2344                 di->digest = (((char *)di)+sizeof(struct digest_info));
2345
2346                 peer_req->digest = di;
2347                 peer_req->flags |= EE_HAS_DIGEST;
2348
2349                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2350                         goto out_free_e;
2351
2352                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2353                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2354                         peer_req->w.cb = w_e_end_csum_rs_req;
2355                         /* used in the sector offset progress display */
2356                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2357                 } else if (pi->cmd == P_OV_REPLY) {
2358                         /* track progress, we may need to throttle */
2359                         atomic_add(size >> 9, &mdev->rs_sect_in);
2360                         peer_req->w.cb = w_e_end_ov_reply;
2361                         dec_rs_pending(mdev);
2362                         /* drbd_rs_begin_io done when we sent this request,
2363                          * but accounting still needs to be done. */
2364                         goto submit_for_resync;
2365                 }
2366                 break;
2367
2368         case P_OV_REQUEST:
2369                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2370                     mdev->tconn->agreed_pro_version >= 90) {
2371                         unsigned long now = jiffies;
2372                         int i;
2373                         mdev->ov_start_sector = sector;
2374                         mdev->ov_position = sector;
2375                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2376                         mdev->rs_total = mdev->ov_left;
2377                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2378                                 mdev->rs_mark_left[i] = mdev->ov_left;
2379                                 mdev->rs_mark_time[i] = now;
2380                         }
2381                         dev_info(DEV, "Online Verify start sector: %llu\n",
2382                                         (unsigned long long)sector);
2383                 }
2384                 peer_req->w.cb = w_e_end_ov_req;
2385                 fault_type = DRBD_FAULT_RS_RD;
2386                 break;
2387
2388         default:
2389                 BUG();
2390         }
2391
2392         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2393          * wrt the receiver, but it is not as straightforward as it may seem.
2394          * Various places in the resync start and stop logic assume resync
2395          * requests are processed in order, requeuing this on the worker thread
2396          * introduces a bunch of new code for synchronization between threads.
2397          *
2398          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2399          * "forever", throttling after drbd_rs_begin_io will lock that extent
2400          * for application writes for the same time.  For now, just throttle
2401          * here, where the rest of the code expects the receiver to sleep for
2402          * a while, anyways.
2403          */
2404
2405         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2406          * this defers syncer requests for some time, before letting at least
2407          * on request through.  The resync controller on the receiving side
2408          * will adapt to the incoming rate accordingly.
2409          *
2410          * We cannot throttle here if remote is Primary/SyncTarget:
2411          * we would also throttle its application reads.
2412          * In that case, throttling is done on the SyncTarget only.
2413          */
2414         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2415                 schedule_timeout_uninterruptible(HZ/10);
2416         if (drbd_rs_begin_io(mdev, sector))
2417                 goto out_free_e;
2418
2419 submit_for_resync:
2420         atomic_add(size >> 9, &mdev->rs_sect_ev);
2421
2422 submit:
2423         inc_unacked(mdev);
2424         spin_lock_irq(&mdev->tconn->req_lock);
2425         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2426         spin_unlock_irq(&mdev->tconn->req_lock);
2427
2428         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2429                 return 0;
2430
2431         /* don't care for the reason here */
2432         dev_err(DEV, "submit failed, triggering re-connect\n");
2433         spin_lock_irq(&mdev->tconn->req_lock);
2434         list_del(&peer_req->w.list);
2435         spin_unlock_irq(&mdev->tconn->req_lock);
2436         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2437
2438 out_free_e:
2439         put_ldev(mdev);
2440         drbd_free_peer_req(mdev, peer_req);
2441         return -EIO;
2442 }
2443
2444 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2445 {
2446         int self, peer, rv = -100;
2447         unsigned long ch_self, ch_peer;
2448         enum drbd_after_sb_p after_sb_0p;
2449
2450         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2451         peer = mdev->p_uuid[UI_BITMAP] & 1;
2452
2453         ch_peer = mdev->p_uuid[UI_SIZE];
2454         ch_self = mdev->comm_bm_set;
2455
2456         rcu_read_lock();
2457         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2458         rcu_read_unlock();
2459         switch (after_sb_0p) {
2460         case ASB_CONSENSUS:
2461         case ASB_DISCARD_SECONDARY:
2462         case ASB_CALL_HELPER:
2463         case ASB_VIOLENTLY:
2464                 dev_err(DEV, "Configuration error.\n");
2465                 break;
2466         case ASB_DISCONNECT:
2467                 break;
2468         case ASB_DISCARD_YOUNGER_PRI:
2469                 if (self == 0 && peer == 1) {
2470                         rv = -1;
2471                         break;
2472                 }
2473                 if (self == 1 && peer == 0) {
2474                         rv =  1;
2475                         break;
2476                 }
2477                 /* Else fall through to one of the other strategies... */
2478         case ASB_DISCARD_OLDER_PRI:
2479                 if (self == 0 && peer == 1) {
2480                         rv = 1;
2481                         break;
2482                 }
2483                 if (self == 1 && peer == 0) {
2484                         rv = -1;
2485                         break;
2486                 }
2487                 /* Else fall through to one of the other strategies... */
2488                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2489                      "Using discard-least-changes instead\n");
2490         case ASB_DISCARD_ZERO_CHG:
2491                 if (ch_peer == 0 && ch_self == 0) {
2492                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2493                                 ? -1 : 1;
2494                         break;
2495                 } else {
2496                         if (ch_peer == 0) { rv =  1; break; }
2497                         if (ch_self == 0) { rv = -1; break; }
2498                 }
2499                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2500                         break;
2501         case ASB_DISCARD_LEAST_CHG:
2502                 if      (ch_self < ch_peer)
2503                         rv = -1;
2504                 else if (ch_self > ch_peer)
2505                         rv =  1;
2506                 else /* ( ch_self == ch_peer ) */
2507                      /* Well, then use something else. */
2508                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2509                                 ? -1 : 1;
2510                 break;
2511         case ASB_DISCARD_LOCAL:
2512                 rv = -1;
2513                 break;
2514         case ASB_DISCARD_REMOTE:
2515                 rv =  1;
2516         }
2517
2518         return rv;
2519 }
2520
2521 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2522 {
2523         int hg, rv = -100;
2524         enum drbd_after_sb_p after_sb_1p;
2525
2526         rcu_read_lock();
2527         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2528         rcu_read_unlock();
2529         switch (after_sb_1p) {
2530         case ASB_DISCARD_YOUNGER_PRI:
2531         case ASB_DISCARD_OLDER_PRI:
2532         case ASB_DISCARD_LEAST_CHG:
2533         case ASB_DISCARD_LOCAL:
2534         case ASB_DISCARD_REMOTE:
2535         case ASB_DISCARD_ZERO_CHG:
2536                 dev_err(DEV, "Configuration error.\n");
2537                 break;
2538         case ASB_DISCONNECT:
2539                 break;
2540         case ASB_CONSENSUS:
2541                 hg = drbd_asb_recover_0p(mdev);
2542                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2543                         rv = hg;
2544                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2545                         rv = hg;
2546                 break;
2547         case ASB_VIOLENTLY:
2548                 rv = drbd_asb_recover_0p(mdev);
2549                 break;
2550         case ASB_DISCARD_SECONDARY:
2551                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2552         case ASB_CALL_HELPER:
2553                 hg = drbd_asb_recover_0p(mdev);
2554                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2555                         enum drbd_state_rv rv2;
2556
2557                         drbd_set_role(mdev, R_SECONDARY, 0);
2558                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2559                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2560                           * we do not need to wait for the after state change work either. */
2561                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2562                         if (rv2 != SS_SUCCESS) {
2563                                 drbd_khelper(mdev, "pri-lost-after-sb");
2564                         } else {
2565                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2566                                 rv = hg;
2567                         }
2568                 } else
2569                         rv = hg;
2570         }
2571
2572         return rv;
2573 }
2574
2575 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2576 {
2577         int hg, rv = -100;
2578         enum drbd_after_sb_p after_sb_2p;
2579
2580         rcu_read_lock();
2581         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2582         rcu_read_unlock();
2583         switch (after_sb_2p) {
2584         case ASB_DISCARD_YOUNGER_PRI:
2585         case ASB_DISCARD_OLDER_PRI:
2586         case ASB_DISCARD_LEAST_CHG:
2587         case ASB_DISCARD_LOCAL:
2588         case ASB_DISCARD_REMOTE:
2589         case ASB_CONSENSUS:
2590         case ASB_DISCARD_SECONDARY:
2591         case ASB_DISCARD_ZERO_CHG:
2592                 dev_err(DEV, "Configuration error.\n");
2593                 break;
2594         case ASB_VIOLENTLY:
2595                 rv = drbd_asb_recover_0p(mdev);
2596                 break;
2597         case ASB_DISCONNECT:
2598                 break;
2599         case ASB_CALL_HELPER:
2600                 hg = drbd_asb_recover_0p(mdev);
2601                 if (hg == -1) {
2602                         enum drbd_state_rv rv2;
2603
2604                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2605                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2606                           * we do not need to wait for the after state change work either. */
2607                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2608                         if (rv2 != SS_SUCCESS) {
2609                                 drbd_khelper(mdev, "pri-lost-after-sb");
2610                         } else {
2611                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2612                                 rv = hg;
2613                         }
2614                 } else
2615                         rv = hg;
2616         }
2617
2618         return rv;
2619 }
2620
2621 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2622                            u64 bits, u64 flags)
2623 {
2624         if (!uuid) {
2625                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2626                 return;
2627         }
2628         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2629              text,
2630              (unsigned long long)uuid[UI_CURRENT],
2631              (unsigned long long)uuid[UI_BITMAP],
2632              (unsigned long long)uuid[UI_HISTORY_START],
2633              (unsigned long long)uuid[UI_HISTORY_END],
2634              (unsigned long long)bits,
2635              (unsigned long long)flags);
2636 }
2637
2638 /*
2639   100   after split brain try auto recover
2640     2   C_SYNC_SOURCE set BitMap
2641     1   C_SYNC_SOURCE use BitMap
2642     0   no Sync
2643    -1   C_SYNC_TARGET use BitMap
2644    -2   C_SYNC_TARGET set BitMap
2645  -100   after split brain, disconnect
2646 -1000   unrelated data
2647 -1091   requires proto 91
2648 -1096   requires proto 96
2649  */
2650 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2651 {
2652         u64 self, peer;
2653         int i, j;
2654
2655         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2656         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2657
2658         *rule_nr = 10;
2659         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2660                 return 0;
2661
2662         *rule_nr = 20;
2663         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2664              peer != UUID_JUST_CREATED)
2665                 return -2;
2666
2667         *rule_nr = 30;
2668         if (self != UUID_JUST_CREATED &&
2669             (peer == UUID_JUST_CREATED || peer == (u64)0))
2670                 return 2;
2671
2672         if (self == peer) {
2673                 int rct, dc; /* roles at crash time */
2674
2675                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2676
2677                         if (mdev->tconn->agreed_pro_version < 91)
2678                                 return -1091;
2679
2680                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2681                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2682                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2683                                 drbd_uuid_set_bm(mdev, 0UL);
2684
2685                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2686                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2687                                 *rule_nr = 34;
2688                         } else {
2689                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2690                                 *rule_nr = 36;
2691                         }
2692
2693                         return 1;
2694                 }
2695
2696                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2697
2698                         if (mdev->tconn->agreed_pro_version < 91)
2699                                 return -1091;
2700
2701                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2702                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2703                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2704
2705                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2706                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2707                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2708
2709                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2710                                 *rule_nr = 35;
2711                         } else {
2712                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2713                                 *rule_nr = 37;
2714                         }
2715
2716                         return -1;
2717                 }
2718
2719                 /* Common power [off|failure] */
2720                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2721                         (mdev->p_uuid[UI_FLAGS] & 2);
2722                 /* lowest bit is set when we were primary,
2723                  * next bit (weight 2) is set when peer was primary */
2724                 *rule_nr = 40;
2725
2726                 switch (rct) {
2727                 case 0: /* !self_pri && !peer_pri */ return 0;
2728                 case 1: /*  self_pri && !peer_pri */ return 1;
2729                 case 2: /* !self_pri &&  peer_pri */ return -1;
2730                 case 3: /*  self_pri &&  peer_pri */
2731                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2732                         return dc ? -1 : 1;
2733                 }
2734         }
2735
2736         *rule_nr = 50;
2737         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2738         if (self == peer)
2739                 return -1;
2740
2741         *rule_nr = 51;
2742         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2743         if (self == peer) {
2744                 if (mdev->tconn->agreed_pro_version < 96 ?
2745                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2746                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2747                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2748                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2749                            resync as sync source modifications of the peer's UUIDs. */
2750
2751                         if (mdev->tconn->agreed_pro_version < 91)
2752                                 return -1091;
2753
2754                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2755                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2756
2757                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2758                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2759
2760                         return -1;
2761                 }
2762         }
2763
2764         *rule_nr = 60;
2765         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2766         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2767                 peer = mdev->p_uuid[i] & ~((u64)1);
2768                 if (self == peer)
2769                         return -2;
2770         }
2771
2772         *rule_nr = 70;
2773         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2774         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2775         if (self == peer)
2776                 return 1;
2777
2778         *rule_nr = 71;
2779         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2780         if (self == peer) {
2781                 if (mdev->tconn->agreed_pro_version < 96 ?
2782                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2783                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2784                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2785                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2786                            resync as sync source modifications of our UUIDs. */
2787
2788                         if (mdev->tconn->agreed_pro_version < 91)
2789                                 return -1091;
2790
2791                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2792                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2793
2794                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2795                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2796                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2797
2798                         return 1;
2799                 }
2800         }
2801
2802
2803         *rule_nr = 80;
2804         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2805         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2806                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2807                 if (self == peer)
2808                         return 2;
2809         }
2810
2811         *rule_nr = 90;
2812         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2813         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2814         if (self == peer && self != ((u64)0))
2815                 return 100;
2816
2817         *rule_nr = 100;
2818         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2819                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2820                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2821                         peer = mdev->p_uuid[j] & ~((u64)1);
2822                         if (self == peer)
2823                                 return -100;
2824                 }
2825         }
2826
2827         return -1000;
2828 }
2829
2830 /* drbd_sync_handshake() returns the new conn state on success, or
2831    CONN_MASK (-1) on failure.
2832  */
2833 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2834                                            enum drbd_disk_state peer_disk) __must_hold(local)
2835 {
2836         enum drbd_conns rv = C_MASK;
2837         enum drbd_disk_state mydisk;
2838         struct net_conf *nc;
2839         int hg, rule_nr, rr_conflict, tentative;
2840
2841         mydisk = mdev->state.disk;
2842         if (mydisk == D_NEGOTIATING)
2843                 mydisk = mdev->new_state_tmp.disk;
2844
2845         dev_info(DEV, "drbd_sync_handshake:\n");
2846         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2847         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2848                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2849
2850         hg = drbd_uuid_compare(mdev, &rule_nr);
2851
2852         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2853
2854         if (hg == -1000) {
2855                 dev_alert(DEV, "Unrelated data, aborting!\n");
2856                 return C_MASK;
2857         }
2858         if (hg < -1000) {
2859                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2860                 return C_MASK;
2861         }
2862
2863         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2864             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2865                 int f = (hg == -100) || abs(hg) == 2;
2866                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2867                 if (f)
2868                         hg = hg*2;
2869                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2870                      hg > 0 ? "source" : "target");
2871         }
2872
2873         if (abs(hg) == 100)
2874                 drbd_khelper(mdev, "initial-split-brain");
2875
2876         rcu_read_lock();
2877         nc = rcu_dereference(mdev->tconn->net_conf);
2878
2879         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2880                 int pcount = (mdev->state.role == R_PRIMARY)
2881                            + (peer_role == R_PRIMARY);
2882                 int forced = (hg == -100);
2883
2884                 switch (pcount) {
2885                 case 0:
2886                         hg = drbd_asb_recover_0p(mdev);
2887                         break;
2888                 case 1:
2889                         hg = drbd_asb_recover_1p(mdev);
2890                         break;
2891                 case 2:
2892                         hg = drbd_asb_recover_2p(mdev);
2893                         break;
2894                 }
2895                 if (abs(hg) < 100) {
2896                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2897                              "automatically solved. Sync from %s node\n",
2898                              pcount, (hg < 0) ? "peer" : "this");
2899                         if (forced) {
2900                                 dev_warn(DEV, "Doing a full sync, since"
2901                                      " UUIDs where ambiguous.\n");
2902                                 hg = hg*2;
2903                         }
2904                 }
2905         }
2906
2907         if (hg == -100) {
2908                 if (nc->discard_my_data && !(mdev->p_uuid[UI_FLAGS]&1))
2909                         hg = -1;
2910                 if (!nc->discard_my_data && (mdev->p_uuid[UI_FLAGS]&1))
2911                         hg = 1;
2912
2913                 if (abs(hg) < 100)
2914                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2915                              "Sync from %s node\n",
2916                              (hg < 0) ? "peer" : "this");
2917         }
2918         rr_conflict = nc->rr_conflict;
2919         tentative = nc->tentative;
2920         rcu_read_unlock();
2921
2922         if (hg == -100) {
2923                 /* FIXME this log message is not correct if we end up here
2924                  * after an attempted attach on a diskless node.
2925                  * We just refuse to attach -- well, we drop the "connection"
2926                  * to that disk, in a way... */
2927                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2928                 drbd_khelper(mdev, "split-brain");
2929                 return C_MASK;
2930         }
2931
2932         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2933                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2934                 return C_MASK;
2935         }
2936
2937         if (hg < 0 && /* by intention we do not use mydisk here. */
2938             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2939                 switch (rr_conflict) {
2940                 case ASB_CALL_HELPER:
2941                         drbd_khelper(mdev, "pri-lost");
2942                         /* fall through */
2943                 case ASB_DISCONNECT:
2944                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2945                         return C_MASK;
2946                 case ASB_VIOLENTLY:
2947                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2948                              "assumption\n");
2949                 }
2950         }
2951
2952         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2953                 if (hg == 0)
2954                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2955                 else
2956                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2957                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2958                                  abs(hg) >= 2 ? "full" : "bit-map based");
2959                 return C_MASK;
2960         }
2961
2962         if (abs(hg) >= 2) {
2963                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2964                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2965                                         BM_LOCKED_SET_ALLOWED))
2966                         return C_MASK;
2967         }
2968
2969         if (hg > 0) { /* become sync source. */
2970                 rv = C_WF_BITMAP_S;
2971         } else if (hg < 0) { /* become sync target */
2972                 rv = C_WF_BITMAP_T;
2973         } else {
2974                 rv = C_CONNECTED;
2975                 if (drbd_bm_total_weight(mdev)) {
2976                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2977                              drbd_bm_total_weight(mdev));
2978                 }
2979         }
2980
2981         return rv;
2982 }
2983
2984 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
2985 {
2986         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2987         if (peer == ASB_DISCARD_REMOTE)
2988                 return ASB_DISCARD_LOCAL;
2989
2990         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2991         if (peer == ASB_DISCARD_LOCAL)
2992                 return ASB_DISCARD_REMOTE;
2993
2994         /* everything else is valid if they are equal on both sides. */
2995         return peer;
2996 }
2997
2998 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2999 {
3000         struct p_protocol *p = pi->data;
3001         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3002         int p_proto, p_discard_my_data, p_two_primaries, cf;
3003         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3004         char integrity_alg[SHARED_SECRET_MAX] = "";
3005         struct crypto_hash *peer_integrity_tfm = NULL, *integrity_tfm = NULL;
3006         void *int_dig_in = NULL, *int_dig_vv = NULL;
3007
3008         p_proto         = be32_to_cpu(p->protocol);
3009         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3010         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3011         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3012         p_two_primaries = be32_to_cpu(p->two_primaries);
3013         cf              = be32_to_cpu(p->conn_flags);
3014         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3015
3016         if (tconn->agreed_pro_version >= 87) {
3017                 int err;
3018
3019                 if (pi->size > sizeof(integrity_alg))
3020                         return -EIO;
3021                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3022                 if (err)
3023                         return err;
3024                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3025         }
3026
3027         if (pi->cmd == P_PROTOCOL_UPDATE) {
3028                 if (integrity_alg[0]) {
3029                         int hash_size;
3030
3031                         peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3032                         integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3033                         if (!(peer_integrity_tfm && integrity_tfm)) {
3034                                 conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3035                                          integrity_alg);
3036                                 goto disconnect;
3037                         }
3038
3039                         hash_size = crypto_hash_digestsize(integrity_tfm);
3040                         int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3041                         int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3042                         if (!(int_dig_in && int_dig_vv)) {
3043                                 conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3044                                 goto disconnect;
3045                         }
3046                 }
3047
3048                 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3049                 if (!new_net_conf) {
3050                         conn_err(tconn, "Allocation of new net_conf failed\n");
3051                         goto disconnect;
3052                 }
3053
3054                 mutex_lock(&tconn->data.mutex);
3055                 mutex_lock(&tconn->conf_update);
3056                 old_net_conf = tconn->net_conf;
3057                 *new_net_conf = *old_net_conf;
3058
3059                 new_net_conf->wire_protocol = p_proto;
3060                 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3061                 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3062                 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3063                 new_net_conf->two_primaries = p_two_primaries;
3064                 strcpy(new_net_conf->integrity_alg, integrity_alg);
3065                 new_net_conf->integrity_alg_len = strlen(integrity_alg) + 1;
3066
3067                 crypto_free_hash(tconn->integrity_tfm);
3068                 tconn->integrity_tfm = integrity_tfm;
3069
3070                 rcu_assign_pointer(tconn->net_conf, new_net_conf);
3071                 mutex_unlock(&tconn->conf_update);
3072                 mutex_unlock(&tconn->data.mutex);
3073
3074                 crypto_free_hash(tconn->peer_integrity_tfm);
3075                 kfree(tconn->int_dig_in);
3076                 kfree(tconn->int_dig_vv);
3077                 tconn->peer_integrity_tfm = peer_integrity_tfm;
3078                 tconn->int_dig_in = int_dig_in;
3079                 tconn->int_dig_vv = int_dig_vv;
3080
3081                 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3082                         conn_info(tconn, "peer data-integrity-alg: %s\n", integrity_alg);
3083
3084                 synchronize_rcu();
3085                 kfree(old_net_conf);
3086         } else {
3087                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3088
3089                 if (cf & CF_DRY_RUN)
3090                         set_bit(CONN_DRY_RUN, &tconn->flags);
3091
3092                 rcu_read_lock();
3093                 nc = rcu_dereference(tconn->net_conf);
3094
3095                 if (p_proto != nc->wire_protocol) {
3096                         conn_err(tconn, "incompatible communication protocols\n");
3097                         goto disconnect_rcu_unlock;
3098                 }
3099
3100                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3101                         conn_err(tconn, "incompatible after-sb-0pri settings\n");
3102                         goto disconnect_rcu_unlock;
3103                 }
3104
3105                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3106                         conn_err(tconn, "incompatible after-sb-1pri settings\n");
3107                         goto disconnect_rcu_unlock;
3108                 }
3109
3110                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3111                         conn_err(tconn, "incompatible after-sb-2pri settings\n");
3112                         goto disconnect_rcu_unlock;
3113                 }
3114
3115                 if (p_discard_my_data && nc->discard_my_data) {
3116                         conn_err(tconn, "both sides have the 'discard_my_data' flag set\n");
3117                         goto disconnect_rcu_unlock;
3118                 }
3119
3120                 if (p_two_primaries != nc->two_primaries) {
3121                         conn_err(tconn, "incompatible setting of the two-primaries options\n");
3122                         goto disconnect_rcu_unlock;
3123                 }
3124
3125                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3126                         conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
3127                         goto disconnect_rcu_unlock;
3128                 }
3129
3130                 rcu_read_unlock();
3131         }
3132         return 0;
3133
3134 disconnect_rcu_unlock:
3135         rcu_read_unlock();
3136 disconnect:
3137         crypto_free_hash(peer_integrity_tfm);
3138         crypto_free_hash(integrity_tfm);
3139         kfree(int_dig_in);
3140         kfree(int_dig_vv);
3141         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3142         return -EIO;
3143 }
3144
3145 /* helper function
3146  * input: alg name, feature name
3147  * return: NULL (alg name was "")
3148  *         ERR_PTR(error) if something goes wrong
3149  *         or the crypto hash ptr, if it worked out ok. */
3150 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3151                 const char *alg, const char *name)
3152 {
3153         struct crypto_hash *tfm;
3154
3155         if (!alg[0])
3156                 return NULL;
3157
3158         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3159         if (IS_ERR(tfm)) {
3160                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3161                         alg, name, PTR_ERR(tfm));
3162                 return tfm;
3163         }
3164         return tfm;
3165 }
3166
3167 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3168 {
3169         void *buffer = tconn->data.rbuf;
3170         int size = pi->size;
3171
3172         while (size) {
3173                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3174                 s = drbd_recv(tconn, buffer, s);
3175                 if (s <= 0) {
3176                         if (s < 0)
3177                                 return s;
3178                         break;
3179                 }
3180                 size -= s;
3181         }
3182         if (size)
3183                 return -EIO;
3184         return 0;
3185 }
3186
3187 /*
3188  * config_unknown_volume  -  device configuration command for unknown volume
3189  *
3190  * When a device is added to an existing connection, the node on which the
3191  * device is added first will send configuration commands to its peer but the
3192  * peer will not know about the device yet.  It will warn and ignore these
3193  * commands.  Once the device is added on the second node, the second node will
3194  * send the same device configuration commands, but in the other direction.
3195  *
3196  * (We can also end up here if drbd is misconfigured.)
3197  */
3198 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3199 {
3200         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3201                   cmdname(pi->cmd), pi->vnr);
3202         return ignore_remaining_packet(tconn, pi);
3203 }
3204
3205 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3206 {
3207         struct drbd_conf *mdev;
3208         struct p_rs_param_95 *p;
3209         unsigned int header_size, data_size, exp_max_sz;
3210         struct crypto_hash *verify_tfm = NULL;
3211         struct crypto_hash *csums_tfm = NULL;
3212         struct net_conf *old_net_conf, *new_net_conf = NULL;
3213         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3214         const int apv = tconn->agreed_pro_version;
3215         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3216         int fifo_size = 0;
3217         int err;
3218
3219         mdev = vnr_to_mdev(tconn, pi->vnr);
3220         if (!mdev)
3221                 return config_unknown_volume(tconn, pi);
3222
3223         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3224                     : apv == 88 ? sizeof(struct p_rs_param)
3225                                         + SHARED_SECRET_MAX
3226                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3227                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3228
3229         if (pi->size > exp_max_sz) {
3230                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3231                     pi->size, exp_max_sz);
3232                 return -EIO;
3233         }
3234
3235         if (apv <= 88) {
3236                 header_size = sizeof(struct p_rs_param);
3237                 data_size = pi->size - header_size;
3238         } else if (apv <= 94) {
3239                 header_size = sizeof(struct p_rs_param_89);
3240                 data_size = pi->size - header_size;
3241                 D_ASSERT(data_size == 0);
3242         } else {
3243                 header_size = sizeof(struct p_rs_param_95);
3244                 data_size = pi->size - header_size;
3245                 D_ASSERT(data_size == 0);
3246         }
3247
3248         /* initialize verify_alg and csums_alg */
3249         p = pi->data;
3250         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3251
3252         err = drbd_recv_all(mdev->tconn, p, header_size);
3253         if (err)
3254                 return err;
3255
3256         mutex_lock(&mdev->tconn->conf_update);
3257         old_net_conf = mdev->tconn->net_conf;
3258         if (get_ldev(mdev)) {
3259                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3260                 if (!new_disk_conf) {
3261                         put_ldev(mdev);
3262                         mutex_unlock(&mdev->tconn->conf_update);
3263                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3264                         return -ENOMEM;
3265                 }
3266
3267                 old_disk_conf = mdev->ldev->disk_conf;
3268                 *new_disk_conf = *old_disk_conf;
3269
3270                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3271         }
3272
3273         if (apv >= 88) {
3274                 if (apv == 88) {
3275                         if (data_size > SHARED_SECRET_MAX) {
3276                                 dev_err(DEV, "verify-alg too long, "
3277                                     "peer wants %u, accepting only %u byte\n",
3278                                                 data_size, SHARED_SECRET_MAX);
3279                                 err = -EIO;
3280                                 goto reconnect;
3281                         }
3282
3283                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3284                         if (err)
3285                                 goto reconnect;
3286                         /* we expect NUL terminated string */
3287                         /* but just in case someone tries to be evil */
3288                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3289                         p->verify_alg[data_size-1] = 0;
3290
3291                 } else /* apv >= 89 */ {
3292                         /* we still expect NUL terminated strings */
3293                         /* but just in case someone tries to be evil */
3294                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3295                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3296                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3297                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3298                 }
3299
3300                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3301                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3302                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3303                                     old_net_conf->verify_alg, p->verify_alg);
3304                                 goto disconnect;
3305                         }
3306                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3307                                         p->verify_alg, "verify-alg");
3308                         if (IS_ERR(verify_tfm)) {
3309                                 verify_tfm = NULL;
3310                                 goto disconnect;
3311                         }
3312                 }
3313
3314                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3315                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3316                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3317                                     old_net_conf->csums_alg, p->csums_alg);
3318                                 goto disconnect;
3319                         }
3320                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3321                                         p->csums_alg, "csums-alg");
3322                         if (IS_ERR(csums_tfm)) {
3323                                 csums_tfm = NULL;
3324                                 goto disconnect;
3325                         }
3326                 }
3327
3328                 if (apv > 94 && new_disk_conf) {
3329                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3330                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3331                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3332                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3333
3334                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3335                         if (fifo_size != mdev->rs_plan_s->size) {
3336                                 new_plan = fifo_alloc(fifo_size);
3337                                 if (!new_plan) {
3338                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3339                                         put_ldev(mdev);
3340                                         goto disconnect;
3341                                 }
3342                         }
3343                 }
3344
3345                 if (verify_tfm || csums_tfm) {
3346                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3347                         if (!new_net_conf) {
3348                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3349                                 goto disconnect;
3350                         }
3351
3352                         *new_net_conf = *old_net_conf;
3353
3354                         if (verify_tfm) {
3355                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3356                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3357                                 crypto_free_hash(mdev->tconn->verify_tfm);
3358                                 mdev->tconn->verify_tfm = verify_tfm;
3359                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3360                         }
3361                         if (csums_tfm) {
3362                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3363                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3364                                 crypto_free_hash(mdev->tconn->csums_tfm);
3365                                 mdev->tconn->csums_tfm = csums_tfm;
3366                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3367                         }
3368                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3369                 }
3370         }
3371
3372         if (new_disk_conf) {
3373                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3374                 put_ldev(mdev);
3375         }
3376
3377         if (new_plan) {
3378                 old_plan = mdev->rs_plan_s;
3379                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3380         }
3381
3382         mutex_unlock(&mdev->tconn->conf_update);
3383         synchronize_rcu();
3384         if (new_net_conf)
3385                 kfree(old_net_conf);
3386         kfree(old_disk_conf);
3387         kfree(old_plan);
3388
3389         return 0;
3390
3391 reconnect:
3392         if (new_disk_conf) {
3393                 put_ldev(mdev);
3394                 kfree(new_disk_conf);
3395         }
3396         mutex_unlock(&mdev->tconn->conf_update);
3397         return -EIO;
3398
3399 disconnect:
3400         kfree(new_plan);
3401         if (new_disk_conf) {
3402                 put_ldev(mdev);
3403                 kfree(new_disk_conf);
3404         }
3405         mutex_unlock(&mdev->tconn->conf_update);
3406         /* just for completeness: actually not needed,
3407          * as this is not reached if csums_tfm was ok. */
3408         crypto_free_hash(csums_tfm);
3409         /* but free the verify_tfm again, if csums_tfm did not work out */
3410         crypto_free_hash(verify_tfm);
3411         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3412         return -EIO;
3413 }
3414
3415 /* warn if the arguments differ by more than 12.5% */
3416 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3417         const char *s, sector_t a, sector_t b)
3418 {
3419         sector_t d;
3420         if (a == 0 || b == 0)
3421                 return;
3422         d = (a > b) ? (a - b) : (b - a);
3423         if (d > (a>>3) || d > (b>>3))
3424                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3425                      (unsigned long long)a, (unsigned long long)b);
3426 }
3427
3428 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3429 {
3430         struct drbd_conf *mdev;
3431         struct p_sizes *p = pi->data;
3432         enum determine_dev_size dd = unchanged;
3433         sector_t p_size, p_usize, my_usize;
3434         int ldsc = 0; /* local disk size changed */
3435         enum dds_flags ddsf;
3436
3437         mdev = vnr_to_mdev(tconn, pi->vnr);
3438         if (!mdev)
3439                 return config_unknown_volume(tconn, pi);
3440
3441         p_size = be64_to_cpu(p->d_size);
3442         p_usize = be64_to_cpu(p->u_size);
3443
3444         /* just store the peer's disk size for now.
3445          * we still need to figure out whether we accept that. */
3446         mdev->p_size = p_size;
3447
3448         if (get_ldev(mdev)) {
3449                 rcu_read_lock();
3450                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3451                 rcu_read_unlock();
3452
3453                 warn_if_differ_considerably(mdev, "lower level device sizes",
3454                            p_size, drbd_get_max_capacity(mdev->ldev));
3455                 warn_if_differ_considerably(mdev, "user requested size",
3456                                             p_usize, my_usize);
3457
3458                 /* if this is the first connect, or an otherwise expected
3459                  * param exchange, choose the minimum */
3460                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3461                         p_usize = min_not_zero(my_usize, p_usize);
3462
3463                 /* Never shrink a device with usable data during connect.
3464                    But allow online shrinking if we are connected. */
3465                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3466                     drbd_get_capacity(mdev->this_bdev) &&
3467                     mdev->state.disk >= D_OUTDATED &&
3468                     mdev->state.conn < C_CONNECTED) {
3469                         dev_err(DEV, "The peer's disk size is too small!\n");
3470                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3471                         put_ldev(mdev);
3472                         return -EIO;
3473                 }
3474
3475                 if (my_usize != p_usize) {
3476                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3477
3478                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3479                         if (!new_disk_conf) {
3480                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3481                                 put_ldev(mdev);
3482                                 return -ENOMEM;
3483                         }
3484
3485                         mutex_lock(&mdev->tconn->conf_update);
3486                         old_disk_conf = mdev->ldev->disk_conf;
3487                         *new_disk_conf = *old_disk_conf;
3488                         new_disk_conf->disk_size = p_usize;
3489
3490                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3491                         mutex_unlock(&mdev->tconn->conf_update);
3492                         synchronize_rcu();
3493                         kfree(old_disk_conf);
3494
3495                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3496                                  (unsigned long)my_usize);
3497                 }
3498
3499                 put_ldev(mdev);
3500         }
3501
3502         ddsf = be16_to_cpu(p->dds_flags);
3503         if (get_ldev(mdev)) {
3504                 dd = drbd_determine_dev_size(mdev, ddsf);
3505                 put_ldev(mdev);
3506                 if (dd == dev_size_error)
3507                         return -EIO;
3508                 drbd_md_sync(mdev);
3509         } else {
3510                 /* I am diskless, need to accept the peer's size. */
3511                 drbd_set_my_capacity(mdev, p_size);
3512         }
3513
3514         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3515         drbd_reconsider_max_bio_size(mdev);
3516
3517         if (get_ldev(mdev)) {
3518                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3519                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3520                         ldsc = 1;
3521                 }
3522
3523                 put_ldev(mdev);
3524         }
3525
3526         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3527                 if (be64_to_cpu(p->c_size) !=
3528                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3529                         /* we have different sizes, probably peer
3530                          * needs to know my new size... */
3531                         drbd_send_sizes(mdev, 0, ddsf);
3532                 }
3533                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3534                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3535                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3536                             mdev->state.disk >= D_INCONSISTENT) {
3537                                 if (ddsf & DDSF_NO_RESYNC)
3538                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3539                                 else
3540                                         resync_after_online_grow(mdev);
3541                         } else
3542                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3543                 }
3544         }
3545
3546         return 0;
3547 }
3548
3549 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3550 {
3551         struct drbd_conf *mdev;
3552         struct p_uuids *p = pi->data;
3553         u64 *p_uuid;
3554         int i, updated_uuids = 0;
3555
3556         mdev = vnr_to_mdev(tconn, pi->vnr);
3557         if (!mdev)
3558                 return config_unknown_volume(tconn, pi);
3559
3560         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3561
3562         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3563                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3564
3565         kfree(mdev->p_uuid);
3566         mdev->p_uuid = p_uuid;
3567
3568         if (mdev->state.conn < C_CONNECTED &&
3569             mdev->state.disk < D_INCONSISTENT &&
3570             mdev->state.role == R_PRIMARY &&
3571             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3572                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3573                     (unsigned long long)mdev->ed_uuid);
3574                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3575                 return -EIO;
3576         }
3577
3578         if (get_ldev(mdev)) {
3579                 int skip_initial_sync =
3580                         mdev->state.conn == C_CONNECTED &&
3581                         mdev->tconn->agreed_pro_version >= 90 &&
3582                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3583                         (p_uuid[UI_FLAGS] & 8);
3584                 if (skip_initial_sync) {
3585                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3586                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3587                                         "clear_n_write from receive_uuids",
3588                                         BM_LOCKED_TEST_ALLOWED);
3589                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3590                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3591                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3592                                         CS_VERBOSE, NULL);
3593                         drbd_md_sync(mdev);
3594                         updated_uuids = 1;
3595                 }
3596                 put_ldev(mdev);
3597         } else if (mdev->state.disk < D_INCONSISTENT &&
3598                    mdev->state.role == R_PRIMARY) {
3599                 /* I am a diskless primary, the peer just created a new current UUID
3600                    for me. */
3601                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3602         }
3603
3604         /* Before we test for the disk state, we should wait until an eventually
3605            ongoing cluster wide state change is finished. That is important if
3606            we are primary and are detaching from our disk. We need to see the
3607            new disk state... */
3608         mutex_lock(mdev->state_mutex);
3609         mutex_unlock(mdev->state_mutex);
3610         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3611                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3612
3613         if (updated_uuids)
3614                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3615
3616         return 0;
3617 }
3618
3619 /**
3620  * convert_state() - Converts the peer's view of the cluster state to our point of view
3621  * @ps:         The state as seen by the peer.
3622  */
3623 static union drbd_state convert_state(union drbd_state ps)
3624 {
3625         union drbd_state ms;
3626
3627         static enum drbd_conns c_tab[] = {
3628                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3629                 [C_CONNECTED] = C_CONNECTED,
3630
3631                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3632                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3633                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3634                 [C_VERIFY_S]       = C_VERIFY_T,
3635                 [C_MASK]   = C_MASK,
3636         };
3637
3638         ms.i = ps.i;
3639
3640         ms.conn = c_tab[ps.conn];
3641         ms.peer = ps.role;
3642         ms.role = ps.peer;
3643         ms.pdsk = ps.disk;
3644         ms.disk = ps.pdsk;
3645         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3646
3647         return ms;
3648 }
3649
3650 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3651 {
3652         struct drbd_conf *mdev;
3653         struct p_req_state *p = pi->data;
3654         union drbd_state mask, val;
3655         enum drbd_state_rv rv;
3656
3657         mdev = vnr_to_mdev(tconn, pi->vnr);
3658         if (!mdev)
3659                 return -EIO;
3660
3661         mask.i = be32_to_cpu(p->mask);
3662         val.i = be32_to_cpu(p->val);
3663
3664         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3665             mutex_is_locked(mdev->state_mutex)) {
3666                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3667                 return 0;
3668         }
3669
3670         mask = convert_state(mask);
3671         val = convert_state(val);
3672
3673         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3674         drbd_send_sr_reply(mdev, rv);
3675
3676         drbd_md_sync(mdev);
3677
3678         return 0;
3679 }
3680
3681 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3682 {
3683         struct p_req_state *p = pi->data;
3684         union drbd_state mask, val;
3685         enum drbd_state_rv rv;
3686
3687         mask.i = be32_to_cpu(p->mask);
3688         val.i = be32_to_cpu(p->val);
3689
3690         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3691             mutex_is_locked(&tconn->cstate_mutex)) {
3692                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3693                 return 0;
3694         }
3695
3696         mask = convert_state(mask);
3697         val = convert_state(val);
3698
3699         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3700         conn_send_sr_reply(tconn, rv);
3701
3702         return 0;
3703 }
3704
3705 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3706 {
3707         struct drbd_conf *mdev;
3708         struct p_state *p = pi->data;
3709         union drbd_state os, ns, peer_state;
3710         enum drbd_disk_state real_peer_disk;
3711         enum chg_state_flags cs_flags;
3712         int rv;
3713
3714         mdev = vnr_to_mdev(tconn, pi->vnr);
3715         if (!mdev)
3716                 return config_unknown_volume(tconn, pi);
3717
3718         peer_state.i = be32_to_cpu(p->state);
3719
3720         real_peer_disk = peer_state.disk;
3721         if (peer_state.disk == D_NEGOTIATING) {
3722                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3723                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3724         }
3725
3726         spin_lock_irq(&mdev->tconn->req_lock);
3727  retry:
3728         os = ns = drbd_read_state(mdev);
3729         spin_unlock_irq(&mdev->tconn->req_lock);
3730
3731         /* peer says his disk is uptodate, while we think it is inconsistent,
3732          * and this happens while we think we have a sync going on. */
3733         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3734             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3735                 /* If we are (becoming) SyncSource, but peer is still in sync
3736                  * preparation, ignore its uptodate-ness to avoid flapping, it
3737                  * will change to inconsistent once the peer reaches active
3738                  * syncing states.
3739                  * It may have changed syncer-paused flags, however, so we
3740                  * cannot ignore this completely. */
3741                 if (peer_state.conn > C_CONNECTED &&
3742                     peer_state.conn < C_SYNC_SOURCE)
3743                         real_peer_disk = D_INCONSISTENT;
3744
3745                 /* if peer_state changes to connected at the same time,
3746                  * it explicitly notifies us that it finished resync.
3747                  * Maybe we should finish it up, too? */
3748                 else if (os.conn >= C_SYNC_SOURCE &&
3749                          peer_state.conn == C_CONNECTED) {
3750                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3751                                 drbd_resync_finished(mdev);
3752                         return 0;
3753                 }
3754         }
3755
3756         /* peer says his disk is inconsistent, while we think it is uptodate,
3757          * and this happens while the peer still thinks we have a sync going on,
3758          * but we think we are already done with the sync.
3759          * We ignore this to avoid flapping pdsk.
3760          * This should not happen, if the peer is a recent version of drbd. */
3761         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3762             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3763                 real_peer_disk = D_UP_TO_DATE;
3764
3765         if (ns.conn == C_WF_REPORT_PARAMS)
3766                 ns.conn = C_CONNECTED;
3767
3768         if (peer_state.conn == C_AHEAD)
3769                 ns.conn = C_BEHIND;
3770
3771         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3772             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3773                 int cr; /* consider resync */
3774
3775                 /* if we established a new connection */
3776                 cr  = (os.conn < C_CONNECTED);
3777                 /* if we had an established connection
3778                  * and one of the nodes newly attaches a disk */
3779                 cr |= (os.conn == C_CONNECTED &&
3780                        (peer_state.disk == D_NEGOTIATING ||
3781                         os.disk == D_NEGOTIATING));
3782                 /* if we have both been inconsistent, and the peer has been
3783                  * forced to be UpToDate with --overwrite-data */
3784                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3785                 /* if we had been plain connected, and the admin requested to
3786                  * start a sync by "invalidate" or "invalidate-remote" */
3787                 cr |= (os.conn == C_CONNECTED &&
3788                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3789                                  peer_state.conn <= C_WF_BITMAP_T));
3790
3791                 if (cr)
3792                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3793
3794                 put_ldev(mdev);
3795                 if (ns.conn == C_MASK) {
3796                         ns.conn = C_CONNECTED;
3797                         if (mdev->state.disk == D_NEGOTIATING) {
3798                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3799                         } else if (peer_state.disk == D_NEGOTIATING) {
3800                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3801                                 peer_state.disk = D_DISKLESS;
3802                                 real_peer_disk = D_DISKLESS;
3803                         } else {
3804                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3805                                         return -EIO;
3806                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3807                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3808                                 return -EIO;
3809                         }
3810                 }
3811         }
3812
3813         spin_lock_irq(&mdev->tconn->req_lock);
3814         if (os.i != drbd_read_state(mdev).i)
3815                 goto retry;
3816         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3817         ns.peer = peer_state.role;
3818         ns.pdsk = real_peer_disk;
3819         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3820         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3821                 ns.disk = mdev->new_state_tmp.disk;
3822         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3823         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3824             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3825                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3826                    for temporal network outages! */
3827                 spin_unlock_irq(&mdev->tconn->req_lock);
3828                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3829                 tl_clear(mdev->tconn);
3830                 drbd_uuid_new_current(mdev);
3831                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3832                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3833                 return -EIO;
3834         }
3835         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3836         ns = drbd_read_state(mdev);
3837         spin_unlock_irq(&mdev->tconn->req_lock);
3838
3839         if (rv < SS_SUCCESS) {
3840                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3841                 return -EIO;
3842         }
3843
3844         if (os.conn > C_WF_REPORT_PARAMS) {
3845                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3846                     peer_state.disk != D_NEGOTIATING ) {
3847                         /* we want resync, peer has not yet decided to sync... */
3848                         /* Nowadays only used when forcing a node into primary role and
3849                            setting its disk to UpToDate with that */
3850                         drbd_send_uuids(mdev);
3851                         drbd_send_state(mdev);
3852                 }
3853         }
3854
3855         mutex_lock(&mdev->tconn->conf_update);
3856         mdev->tconn->net_conf->discard_my_data = 0; /* without copy; single bit op is atomic */
3857         mutex_unlock(&mdev->tconn->conf_update);
3858
3859         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3860
3861         return 0;
3862 }
3863
3864 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3865 {
3866         struct drbd_conf *mdev;
3867         struct p_rs_uuid *p = pi->data;
3868
3869         mdev = vnr_to_mdev(tconn, pi->vnr);
3870         if (!mdev)
3871                 return -EIO;
3872
3873         wait_event(mdev->misc_wait,
3874                    mdev->state.conn == C_WF_SYNC_UUID ||
3875                    mdev->state.conn == C_BEHIND ||
3876                    mdev->state.conn < C_CONNECTED ||
3877                    mdev->state.disk < D_NEGOTIATING);
3878
3879         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3880
3881         /* Here the _drbd_uuid_ functions are right, current should
3882            _not_ be rotated into the history */
3883         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3884                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3885                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3886
3887                 drbd_print_uuids(mdev, "updated sync uuid");
3888                 drbd_start_resync(mdev, C_SYNC_TARGET);
3889
3890                 put_ldev(mdev);
3891         } else
3892                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3893
3894         return 0;
3895 }
3896
3897 /**
3898  * receive_bitmap_plain
3899  *
3900  * Return 0 when done, 1 when another iteration is needed, and a negative error
3901  * code upon failure.
3902  */
3903 static int
3904 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3905                      unsigned long *p, struct bm_xfer_ctx *c)
3906 {
3907         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3908                                  drbd_header_size(mdev->tconn);
3909         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3910                                        c->bm_words - c->word_offset);
3911         unsigned int want = num_words * sizeof(*p);
3912         int err;
3913
3914         if (want != size) {
3915                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3916                 return -EIO;
3917         }
3918         if (want == 0)
3919                 return 0;
3920         err = drbd_recv_all(mdev->tconn, p, want);
3921         if (err)
3922                 return err;
3923
3924         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3925
3926         c->word_offset += num_words;
3927         c->bit_offset = c->word_offset * BITS_PER_LONG;
3928         if (c->bit_offset > c->bm_bits)
3929                 c->bit_offset = c->bm_bits;
3930
3931         return 1;
3932 }
3933
3934 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3935 {
3936         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3937 }
3938
3939 static int dcbp_get_start(struct p_compressed_bm *p)
3940 {
3941         return (p->encoding & 0x80) != 0;
3942 }
3943
3944 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3945 {
3946         return (p->encoding >> 4) & 0x7;
3947 }
3948
3949 /**
3950  * recv_bm_rle_bits
3951  *
3952  * Return 0 when done, 1 when another iteration is needed, and a negative error
3953  * code upon failure.
3954  */
3955 static int
3956 recv_bm_rle_bits(struct drbd_conf *mdev,
3957                 struct p_compressed_bm *p,
3958                  struct bm_xfer_ctx *c,
3959                  unsigned int len)
3960 {
3961         struct bitstream bs;
3962         u64 look_ahead;
3963         u64 rl;
3964         u64 tmp;
3965         unsigned long s = c->bit_offset;
3966         unsigned long e;
3967         int toggle = dcbp_get_start(p);
3968         int have;
3969         int bits;
3970
3971         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3972
3973         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3974         if (bits < 0)
3975                 return -EIO;
3976
3977         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3978                 bits = vli_decode_bits(&rl, look_ahead);
3979                 if (bits <= 0)
3980                         return -EIO;
3981
3982                 if (toggle) {
3983                         e = s + rl -1;
3984                         if (e >= c->bm_bits) {
3985                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3986                                 return -EIO;
3987                         }
3988                         _drbd_bm_set_bits(mdev, s, e);
3989                 }
3990
3991                 if (have < bits) {
3992                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3993                                 have, bits, look_ahead,
3994                                 (unsigned int)(bs.cur.b - p->code),
3995                                 (unsigned int)bs.buf_len);
3996                         return -EIO;
3997                 }
3998                 look_ahead >>= bits;
3999                 have -= bits;
4000
4001                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4002                 if (bits < 0)
4003                         return -EIO;
4004                 look_ahead |= tmp << have;
4005                 have += bits;
4006         }
4007
4008         c->bit_offset = s;
4009         bm_xfer_ctx_bit_to_word_offset(c);
4010
4011         return (s != c->bm_bits);
4012 }
4013
4014 /**
4015  * decode_bitmap_c
4016  *
4017  * Return 0 when done, 1 when another iteration is needed, and a negative error
4018  * code upon failure.
4019  */
4020 static int
4021 decode_bitmap_c(struct drbd_conf *mdev,
4022                 struct p_compressed_bm *p,
4023                 struct bm_xfer_ctx *c,
4024                 unsigned int len)
4025 {
4026         if (dcbp_get_code(p) == RLE_VLI_Bits)
4027                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4028
4029         /* other variants had been implemented for evaluation,
4030          * but have been dropped as this one turned out to be "best"
4031          * during all our tests. */
4032
4033         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4034         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4035         return -EIO;
4036 }
4037
4038 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4039                 const char *direction, struct bm_xfer_ctx *c)
4040 {
4041         /* what would it take to transfer it "plaintext" */
4042         unsigned int header_size = drbd_header_size(mdev->tconn);
4043         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4044         unsigned int plain =
4045                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4046                 c->bm_words * sizeof(unsigned long);
4047         unsigned int total = c->bytes[0] + c->bytes[1];
4048         unsigned int r;
4049
4050         /* total can not be zero. but just in case: */
4051         if (total == 0)
4052                 return;
4053
4054         /* don't report if not compressed */
4055         if (total >= plain)
4056                 return;
4057
4058         /* total < plain. check for overflow, still */
4059         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4060                                     : (1000 * total / plain);
4061
4062         if (r > 1000)
4063                 r = 1000;
4064
4065         r = 1000 - r;
4066         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4067              "total %u; compression: %u.%u%%\n",
4068                         direction,
4069                         c->bytes[1], c->packets[1],
4070                         c->bytes[0], c->packets[0],
4071                         total, r/10, r % 10);
4072 }
4073
4074 /* Since we are processing the bitfield from lower addresses to higher,
4075    it does not matter if the process it in 32 bit chunks or 64 bit
4076    chunks as long as it is little endian. (Understand it as byte stream,
4077    beginning with the lowest byte...) If we would use big endian
4078    we would need to process it from the highest address to the lowest,
4079    in order to be agnostic to the 32 vs 64 bits issue.
4080
4081    returns 0 on failure, 1 if we successfully received it. */
4082 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4083 {
4084         struct drbd_conf *mdev;
4085         struct bm_xfer_ctx c;
4086         int err;
4087
4088         mdev = vnr_to_mdev(tconn, pi->vnr);
4089         if (!mdev)
4090                 return -EIO;
4091
4092         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4093         /* you are supposed to send additional out-of-sync information
4094          * if you actually set bits during this phase */
4095
4096         c = (struct bm_xfer_ctx) {
4097                 .bm_bits = drbd_bm_bits(mdev),
4098                 .bm_words = drbd_bm_words(mdev),
4099         };
4100
4101         for(;;) {
4102                 if (pi->cmd == P_BITMAP)
4103                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4104                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4105                         /* MAYBE: sanity check that we speak proto >= 90,
4106                          * and the feature is enabled! */
4107                         struct p_compressed_bm *p = pi->data;
4108
4109                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4110                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4111                                 err = -EIO;
4112                                 goto out;
4113                         }
4114                         if (pi->size <= sizeof(*p)) {
4115                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4116                                 err = -EIO;
4117                                 goto out;
4118                         }
4119                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4120                         if (err)
4121                                goto out;
4122                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4123                 } else {
4124                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4125                         err = -EIO;
4126                         goto out;
4127                 }
4128
4129                 c.packets[pi->cmd == P_BITMAP]++;
4130                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4131
4132                 if (err <= 0) {
4133                         if (err < 0)
4134                                 goto out;
4135                         break;
4136                 }
4137                 err = drbd_recv_header(mdev->tconn, pi);
4138                 if (err)
4139                         goto out;
4140         }
4141
4142         INFO_bm_xfer_stats(mdev, "receive", &c);
4143
4144         if (mdev->state.conn == C_WF_BITMAP_T) {
4145                 enum drbd_state_rv rv;
4146
4147                 err = drbd_send_bitmap(mdev);
4148                 if (err)
4149                         goto out;
4150                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4151                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4152                 D_ASSERT(rv == SS_SUCCESS);
4153         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4154                 /* admin may have requested C_DISCONNECTING,
4155                  * other threads may have noticed network errors */
4156                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4157                     drbd_conn_str(mdev->state.conn));
4158         }
4159         err = 0;
4160
4161  out:
4162         drbd_bm_unlock(mdev);
4163         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4164                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4165         return err;
4166 }
4167
4168 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4169 {
4170         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4171                  pi->cmd, pi->size);
4172
4173         return ignore_remaining_packet(tconn, pi);
4174 }
4175
4176 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4177 {
4178         /* Make sure we've acked all the TCP data associated
4179          * with the data requests being unplugged */
4180         drbd_tcp_quickack(tconn->data.socket);
4181
4182         return 0;
4183 }
4184
4185 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4186 {
4187         struct drbd_conf *mdev;
4188         struct p_block_desc *p = pi->data;
4189
4190         mdev = vnr_to_mdev(tconn, pi->vnr);
4191         if (!mdev)
4192                 return -EIO;
4193
4194         switch (mdev->state.conn) {
4195         case C_WF_SYNC_UUID:
4196         case C_WF_BITMAP_T:
4197         case C_BEHIND:
4198                         break;
4199         default:
4200                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4201                                 drbd_conn_str(mdev->state.conn));
4202         }
4203
4204         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4205
4206         return 0;
4207 }
4208
4209 struct data_cmd {
4210         int expect_payload;
4211         size_t pkt_size;
4212         int (*fn)(struct drbd_tconn *, struct packet_info *);
4213 };
4214
4215 static struct data_cmd drbd_cmd_handler[] = {
4216         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4217         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4218         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4219         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4220         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4221         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4222         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4223         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4224         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4225         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4226         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4227         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4228         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4229         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4230         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4231         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4232         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4233         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4234         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4235         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4236         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4237         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4238         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4239         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4240 };
4241
4242 static void drbdd(struct drbd_tconn *tconn)
4243 {
4244         struct packet_info pi;
4245         size_t shs; /* sub header size */
4246         int err;
4247
4248         while (get_t_state(&tconn->receiver) == RUNNING) {
4249                 struct data_cmd *cmd;
4250
4251                 drbd_thread_current_set_cpu(&tconn->receiver);
4252                 if (drbd_recv_header(tconn, &pi))
4253                         goto err_out;
4254
4255                 cmd = &drbd_cmd_handler[pi.cmd];
4256                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4257                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4258                                  cmdname(pi.cmd), pi.cmd);
4259                         goto err_out;
4260                 }
4261
4262                 shs = cmd->pkt_size;
4263                 if (pi.size > shs && !cmd->expect_payload) {
4264                         conn_err(tconn, "No payload expected %s l:%d\n",
4265                                  cmdname(pi.cmd), pi.size);
4266                         goto err_out;
4267                 }
4268
4269                 if (shs) {
4270                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4271                         if (err)
4272                                 goto err_out;
4273                         pi.size -= shs;
4274                 }
4275
4276                 err = cmd->fn(tconn, &pi);
4277                 if (err) {
4278                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4279                                  cmdname(pi.cmd), err, pi.size);
4280                         goto err_out;
4281                 }
4282         }
4283         return;
4284
4285     err_out:
4286         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4287 }
4288
4289 void conn_flush_workqueue(struct drbd_tconn *tconn)
4290 {
4291         struct drbd_wq_barrier barr;
4292
4293         barr.w.cb = w_prev_work_done;
4294         barr.w.tconn = tconn;
4295         init_completion(&barr.done);
4296         drbd_queue_work(&tconn->data.work, &barr.w);
4297         wait_for_completion(&barr.done);
4298 }
4299
4300 static void conn_disconnect(struct drbd_tconn *tconn)
4301 {
4302         struct drbd_conf *mdev;
4303         enum drbd_conns oc;
4304         int vnr, rv = SS_UNKNOWN_ERROR;
4305
4306         if (tconn->cstate == C_STANDALONE)
4307                 return;
4308
4309         /* asender does not clean up anything. it must not interfere, either */
4310         drbd_thread_stop(&tconn->asender);
4311         drbd_free_sock(tconn);
4312
4313         rcu_read_lock();
4314         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4315                 kref_get(&mdev->kref);
4316                 rcu_read_unlock();
4317                 drbd_disconnected(mdev);
4318                 kref_put(&mdev->kref, &drbd_minor_destroy);
4319                 rcu_read_lock();
4320         }
4321         rcu_read_unlock();
4322
4323         conn_info(tconn, "Connection closed\n");
4324
4325         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4326                 conn_try_outdate_peer_async(tconn);
4327
4328         spin_lock_irq(&tconn->req_lock);
4329         oc = tconn->cstate;
4330         if (oc >= C_UNCONNECTED)
4331                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4332
4333         spin_unlock_irq(&tconn->req_lock);
4334
4335         if (oc == C_DISCONNECTING)
4336                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4337 }
4338
4339 static int drbd_disconnected(struct drbd_conf *mdev)
4340 {
4341         enum drbd_fencing_p fp;
4342         unsigned int i;
4343
4344         /* wait for current activity to cease. */
4345         spin_lock_irq(&mdev->tconn->req_lock);
4346         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4347         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4348         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4349         spin_unlock_irq(&mdev->tconn->req_lock);
4350
4351         /* We do not have data structures that would allow us to
4352          * get the rs_pending_cnt down to 0 again.
4353          *  * On C_SYNC_TARGET we do not have any data structures describing
4354          *    the pending RSDataRequest's we have sent.
4355          *  * On C_SYNC_SOURCE there is no data structure that tracks
4356          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4357          *  And no, it is not the sum of the reference counts in the
4358          *  resync_LRU. The resync_LRU tracks the whole operation including
4359          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4360          *  on the fly. */
4361         drbd_rs_cancel_all(mdev);
4362         mdev->rs_total = 0;
4363         mdev->rs_failed = 0;
4364         atomic_set(&mdev->rs_pending_cnt, 0);
4365         wake_up(&mdev->misc_wait);
4366
4367         del_timer_sync(&mdev->resync_timer);
4368         resync_timer_fn((unsigned long)mdev);
4369
4370         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4371          * w_make_resync_request etc. which may still be on the worker queue
4372          * to be "canceled" */
4373         drbd_flush_workqueue(mdev);
4374
4375         drbd_finish_peer_reqs(mdev);
4376
4377         kfree(mdev->p_uuid);
4378         mdev->p_uuid = NULL;
4379
4380         if (!drbd_suspended(mdev))
4381                 tl_clear(mdev->tconn);
4382
4383         drbd_md_sync(mdev);
4384
4385         fp = FP_DONT_CARE;
4386         if (get_ldev(mdev)) {
4387                 rcu_read_lock();
4388                 fp = rcu_dereference(mdev->ldev->disk_conf)->fencing;
4389                 rcu_read_unlock();
4390                 put_ldev(mdev);
4391         }
4392
4393         /* serialize with bitmap writeout triggered by the state change,
4394          * if any. */
4395         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4396
4397         /* tcp_close and release of sendpage pages can be deferred.  I don't
4398          * want to use SO_LINGER, because apparently it can be deferred for
4399          * more than 20 seconds (longest time I checked).
4400          *
4401          * Actually we don't care for exactly when the network stack does its
4402          * put_page(), but release our reference on these pages right here.
4403          */
4404         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4405         if (i)
4406                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4407         i = atomic_read(&mdev->pp_in_use_by_net);
4408         if (i)
4409                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4410         i = atomic_read(&mdev->pp_in_use);
4411         if (i)
4412                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4413
4414         D_ASSERT(list_empty(&mdev->read_ee));
4415         D_ASSERT(list_empty(&mdev->active_ee));
4416         D_ASSERT(list_empty(&mdev->sync_ee));
4417         D_ASSERT(list_empty(&mdev->done_ee));
4418
4419         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4420         atomic_set(&mdev->current_epoch->epoch_size, 0);
4421         D_ASSERT(list_empty(&mdev->current_epoch->list));
4422
4423         return 0;
4424 }
4425
4426 /*
4427  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4428  * we can agree on is stored in agreed_pro_version.
4429  *
4430  * feature flags and the reserved array should be enough room for future
4431  * enhancements of the handshake protocol, and possible plugins...
4432  *
4433  * for now, they are expected to be zero, but ignored.
4434  */
4435 static int drbd_send_features(struct drbd_tconn *tconn)
4436 {
4437         struct drbd_socket *sock;
4438         struct p_connection_features *p;
4439
4440         sock = &tconn->data;
4441         p = conn_prepare_command(tconn, sock);
4442         if (!p)
4443                 return -EIO;
4444         memset(p, 0, sizeof(*p));
4445         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4446         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4447         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4448 }
4449
4450 /*
4451  * return values:
4452  *   1 yes, we have a valid connection
4453  *   0 oops, did not work out, please try again
4454  *  -1 peer talks different language,
4455  *     no point in trying again, please go standalone.
4456  */
4457 static int drbd_do_features(struct drbd_tconn *tconn)
4458 {
4459         /* ASSERT current == tconn->receiver ... */
4460         struct p_connection_features *p;
4461         const int expect = sizeof(struct p_connection_features);
4462         struct packet_info pi;
4463         int err;
4464
4465         err = drbd_send_features(tconn);
4466         if (err)
4467                 return 0;
4468
4469         err = drbd_recv_header(tconn, &pi);
4470         if (err)
4471                 return 0;
4472
4473         if (pi.cmd != P_CONNECTION_FEATURES) {
4474                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4475                          cmdname(pi.cmd), pi.cmd);
4476                 return -1;
4477         }
4478
4479         if (pi.size != expect) {
4480                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4481                      expect, pi.size);
4482                 return -1;
4483         }
4484
4485         p = pi.data;
4486         err = drbd_recv_all_warn(tconn, p, expect);
4487         if (err)
4488                 return 0;
4489
4490         p->protocol_min = be32_to_cpu(p->protocol_min);
4491         p->protocol_max = be32_to_cpu(p->protocol_max);
4492         if (p->protocol_max == 0)
4493                 p->protocol_max = p->protocol_min;
4494
4495         if (PRO_VERSION_MAX < p->protocol_min ||
4496             PRO_VERSION_MIN > p->protocol_max)
4497                 goto incompat;
4498
4499         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4500
4501         conn_info(tconn, "Handshake successful: "
4502              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4503
4504         return 1;
4505
4506  incompat:
4507         conn_err(tconn, "incompatible DRBD dialects: "
4508             "I support %d-%d, peer supports %d-%d\n",
4509             PRO_VERSION_MIN, PRO_VERSION_MAX,
4510             p->protocol_min, p->protocol_max);
4511         return -1;
4512 }
4513
4514 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4515 static int drbd_do_auth(struct drbd_tconn *tconn)
4516 {
4517         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4518         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4519         return -1;
4520 }
4521 #else
4522 #define CHALLENGE_LEN 64
4523
4524 /* Return value:
4525         1 - auth succeeded,
4526         0 - failed, try again (network error),
4527         -1 - auth failed, don't try again.
4528 */
4529
4530 static int drbd_do_auth(struct drbd_tconn *tconn)
4531 {
4532         struct drbd_socket *sock;
4533         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4534         struct scatterlist sg;
4535         char *response = NULL;
4536         char *right_response = NULL;
4537         char *peers_ch = NULL;
4538         unsigned int key_len;
4539         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4540         unsigned int resp_size;
4541         struct hash_desc desc;
4542         struct packet_info pi;
4543         struct net_conf *nc;
4544         int err, rv;
4545
4546         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4547
4548         rcu_read_lock();
4549         nc = rcu_dereference(tconn->net_conf);
4550         key_len = strlen(nc->shared_secret);
4551         memcpy(secret, nc->shared_secret, key_len);
4552         rcu_read_unlock();
4553
4554         desc.tfm = tconn->cram_hmac_tfm;
4555         desc.flags = 0;
4556
4557         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4558         if (rv) {
4559                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4560                 rv = -1;
4561                 goto fail;
4562         }
4563
4564         get_random_bytes(my_challenge, CHALLENGE_LEN);
4565
4566         sock = &tconn->data;
4567         if (!conn_prepare_command(tconn, sock)) {
4568                 rv = 0;
4569                 goto fail;
4570         }
4571         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4572                                 my_challenge, CHALLENGE_LEN);
4573         if (!rv)
4574                 goto fail;
4575
4576         err = drbd_recv_header(tconn, &pi);
4577         if (err) {
4578                 rv = 0;
4579                 goto fail;
4580         }
4581
4582         if (pi.cmd != P_AUTH_CHALLENGE) {
4583                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4584                          cmdname(pi.cmd), pi.cmd);
4585                 rv = 0;
4586                 goto fail;
4587         }
4588
4589         if (pi.size > CHALLENGE_LEN * 2) {
4590                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4591                 rv = -1;
4592                 goto fail;
4593         }
4594
4595         peers_ch = kmalloc(pi.size, GFP_NOIO);
4596         if (peers_ch == NULL) {
4597                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4598                 rv = -1;
4599                 goto fail;
4600         }
4601
4602         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4603         if (err) {
4604                 rv = 0;
4605                 goto fail;
4606         }
4607
4608         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4609         response = kmalloc(resp_size, GFP_NOIO);
4610         if (response == NULL) {
4611                 conn_err(tconn, "kmalloc of response failed\n");
4612                 rv = -1;
4613                 goto fail;
4614         }
4615
4616         sg_init_table(&sg, 1);
4617         sg_set_buf(&sg, peers_ch, pi.size);
4618
4619         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4620         if (rv) {
4621                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4622                 rv = -1;
4623                 goto fail;
4624         }
4625
4626         if (!conn_prepare_command(tconn, sock)) {
4627                 rv = 0;
4628                 goto fail;
4629         }
4630         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4631                                 response, resp_size);
4632         if (!rv)
4633                 goto fail;
4634
4635         err = drbd_recv_header(tconn, &pi);
4636         if (err) {
4637                 rv = 0;
4638                 goto fail;
4639         }
4640
4641         if (pi.cmd != P_AUTH_RESPONSE) {
4642                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4643                          cmdname(pi.cmd), pi.cmd);
4644                 rv = 0;
4645                 goto fail;
4646         }
4647
4648         if (pi.size != resp_size) {
4649                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4650                 rv = 0;
4651                 goto fail;
4652         }
4653
4654         err = drbd_recv_all_warn(tconn, response , resp_size);
4655         if (err) {
4656                 rv = 0;
4657                 goto fail;
4658         }
4659
4660         right_response = kmalloc(resp_size, GFP_NOIO);
4661         if (right_response == NULL) {
4662                 conn_err(tconn, "kmalloc of right_response failed\n");
4663                 rv = -1;
4664                 goto fail;
4665         }
4666
4667         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4668
4669         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4670         if (rv) {
4671                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4672                 rv = -1;
4673                 goto fail;
4674         }
4675
4676         rv = !memcmp(response, right_response, resp_size);
4677
4678         if (rv)
4679                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4680                      resp_size);
4681         else
4682                 rv = -1;
4683
4684  fail:
4685         kfree(peers_ch);
4686         kfree(response);
4687         kfree(right_response);
4688
4689         return rv;
4690 }
4691 #endif
4692
4693 int drbdd_init(struct drbd_thread *thi)
4694 {
4695         struct drbd_tconn *tconn = thi->tconn;
4696         int h;
4697
4698         conn_info(tconn, "receiver (re)started\n");
4699
4700         do {
4701                 h = conn_connect(tconn);
4702                 if (h == 0) {
4703                         conn_disconnect(tconn);
4704                         schedule_timeout_interruptible(HZ);
4705                 }
4706                 if (h == -1) {
4707                         conn_warn(tconn, "Discarding network configuration.\n");
4708                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4709                 }
4710         } while (h == 0);
4711
4712         if (h > 0)
4713                 drbdd(tconn);
4714
4715         conn_disconnect(tconn);
4716
4717         conn_info(tconn, "receiver terminated\n");
4718         return 0;
4719 }
4720
4721 /* ********* acknowledge sender ******** */
4722
4723 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4724 {
4725         struct p_req_state_reply *p = pi->data;
4726         int retcode = be32_to_cpu(p->retcode);
4727
4728         if (retcode >= SS_SUCCESS) {
4729                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4730         } else {
4731                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4732                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4733                          drbd_set_st_err_str(retcode), retcode);
4734         }
4735         wake_up(&tconn->ping_wait);
4736
4737         return 0;
4738 }
4739
4740 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4741 {
4742         struct drbd_conf *mdev;
4743         struct p_req_state_reply *p = pi->data;
4744         int retcode = be32_to_cpu(p->retcode);
4745
4746         mdev = vnr_to_mdev(tconn, pi->vnr);
4747         if (!mdev)
4748                 return -EIO;
4749
4750         if (retcode >= SS_SUCCESS) {
4751                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4752         } else {
4753                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4754                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4755                         drbd_set_st_err_str(retcode), retcode);
4756         }
4757         wake_up(&mdev->state_wait);
4758
4759         return 0;
4760 }
4761
4762 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4763 {
4764         return drbd_send_ping_ack(tconn);
4765
4766 }
4767
4768 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4769 {
4770         /* restore idle timeout */
4771         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4772         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4773                 wake_up(&tconn->ping_wait);
4774
4775         return 0;
4776 }
4777
4778 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4779 {
4780         struct drbd_conf *mdev;
4781         struct p_block_ack *p = pi->data;
4782         sector_t sector = be64_to_cpu(p->sector);
4783         int blksize = be32_to_cpu(p->blksize);
4784
4785         mdev = vnr_to_mdev(tconn, pi->vnr);
4786         if (!mdev)
4787                 return -EIO;
4788
4789         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4790
4791         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4792
4793         if (get_ldev(mdev)) {
4794                 drbd_rs_complete_io(mdev, sector);
4795                 drbd_set_in_sync(mdev, sector, blksize);
4796                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4797                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4798                 put_ldev(mdev);
4799         }
4800         dec_rs_pending(mdev);
4801         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4802
4803         return 0;
4804 }
4805
4806 static int
4807 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4808                               struct rb_root *root, const char *func,
4809                               enum drbd_req_event what, bool missing_ok)
4810 {
4811         struct drbd_request *req;
4812         struct bio_and_error m;
4813
4814         spin_lock_irq(&mdev->tconn->req_lock);
4815         req = find_request(mdev, root, id, sector, missing_ok, func);
4816         if (unlikely(!req)) {
4817                 spin_unlock_irq(&mdev->tconn->req_lock);
4818                 return -EIO;
4819         }
4820         __req_mod(req, what, &m);
4821         spin_unlock_irq(&mdev->tconn->req_lock);
4822
4823         if (m.bio)
4824                 complete_master_bio(mdev, &m);
4825         return 0;
4826 }
4827
4828 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4829 {
4830         struct drbd_conf *mdev;
4831         struct p_block_ack *p = pi->data;
4832         sector_t sector = be64_to_cpu(p->sector);
4833         int blksize = be32_to_cpu(p->blksize);
4834         enum drbd_req_event what;
4835
4836         mdev = vnr_to_mdev(tconn, pi->vnr);
4837         if (!mdev)
4838                 return -EIO;
4839
4840         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4841
4842         if (p->block_id == ID_SYNCER) {
4843                 drbd_set_in_sync(mdev, sector, blksize);
4844                 dec_rs_pending(mdev);
4845                 return 0;
4846         }
4847         switch (pi->cmd) {
4848         case P_RS_WRITE_ACK:
4849                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4850                 break;
4851         case P_WRITE_ACK:
4852                 what = WRITE_ACKED_BY_PEER;
4853                 break;
4854         case P_RECV_ACK:
4855                 what = RECV_ACKED_BY_PEER;
4856                 break;
4857         case P_DISCARD_WRITE:
4858                 what = DISCARD_WRITE;
4859                 break;
4860         case P_RETRY_WRITE:
4861                 what = POSTPONE_WRITE;
4862                 break;
4863         default:
4864                 BUG();
4865         }
4866
4867         return validate_req_change_req_state(mdev, p->block_id, sector,
4868                                              &mdev->write_requests, __func__,
4869                                              what, false);
4870 }
4871
4872 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4873 {
4874         struct drbd_conf *mdev;
4875         struct p_block_ack *p = pi->data;
4876         sector_t sector = be64_to_cpu(p->sector);
4877         int size = be32_to_cpu(p->blksize);
4878         int err;
4879
4880         mdev = vnr_to_mdev(tconn, pi->vnr);
4881         if (!mdev)
4882                 return -EIO;
4883
4884         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4885
4886         if (p->block_id == ID_SYNCER) {
4887                 dec_rs_pending(mdev);
4888                 drbd_rs_failed_io(mdev, sector, size);
4889                 return 0;
4890         }
4891
4892         err = validate_req_change_req_state(mdev, p->block_id, sector,
4893                                             &mdev->write_requests, __func__,
4894                                             NEG_ACKED, true);
4895         if (err) {
4896                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4897                    The master bio might already be completed, therefore the
4898                    request is no longer in the collision hash. */
4899                 /* In Protocol B we might already have got a P_RECV_ACK
4900                    but then get a P_NEG_ACK afterwards. */
4901                 drbd_set_out_of_sync(mdev, sector, size);
4902         }
4903         return 0;
4904 }
4905
4906 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4907 {
4908         struct drbd_conf *mdev;
4909         struct p_block_ack *p = pi->data;
4910         sector_t sector = be64_to_cpu(p->sector);
4911
4912         mdev = vnr_to_mdev(tconn, pi->vnr);
4913         if (!mdev)
4914                 return -EIO;
4915
4916         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4917
4918         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4919             (unsigned long long)sector, be32_to_cpu(p->blksize));
4920
4921         return validate_req_change_req_state(mdev, p->block_id, sector,
4922                                              &mdev->read_requests, __func__,
4923                                              NEG_ACKED, false);
4924 }
4925
4926 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4927 {
4928         struct drbd_conf *mdev;
4929         sector_t sector;
4930         int size;
4931         struct p_block_ack *p = pi->data;
4932
4933         mdev = vnr_to_mdev(tconn, pi->vnr);
4934         if (!mdev)
4935                 return -EIO;
4936
4937         sector = be64_to_cpu(p->sector);
4938         size = be32_to_cpu(p->blksize);
4939
4940         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4941
4942         dec_rs_pending(mdev);
4943
4944         if (get_ldev_if_state(mdev, D_FAILED)) {
4945                 drbd_rs_complete_io(mdev, sector);
4946                 switch (pi->cmd) {
4947                 case P_NEG_RS_DREPLY:
4948                         drbd_rs_failed_io(mdev, sector, size);
4949                 case P_RS_CANCEL:
4950                         break;
4951                 default:
4952                         BUG();
4953                 }
4954                 put_ldev(mdev);
4955         }
4956
4957         return 0;
4958 }
4959
4960 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4961 {
4962         struct drbd_conf *mdev;
4963         struct p_barrier_ack *p = pi->data;
4964
4965         mdev = vnr_to_mdev(tconn, pi->vnr);
4966         if (!mdev)
4967                 return -EIO;
4968
4969         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4970
4971         if (mdev->state.conn == C_AHEAD &&
4972             atomic_read(&mdev->ap_in_flight) == 0 &&
4973             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4974                 mdev->start_resync_timer.expires = jiffies + HZ;
4975                 add_timer(&mdev->start_resync_timer);
4976         }
4977
4978         return 0;
4979 }
4980
4981 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4982 {
4983         struct drbd_conf *mdev;
4984         struct p_block_ack *p = pi->data;
4985         struct drbd_work *w;
4986         sector_t sector;
4987         int size;
4988
4989         mdev = vnr_to_mdev(tconn, pi->vnr);
4990         if (!mdev)
4991                 return -EIO;
4992
4993         sector = be64_to_cpu(p->sector);
4994         size = be32_to_cpu(p->blksize);
4995
4996         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4997
4998         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4999                 drbd_ov_out_of_sync_found(mdev, sector, size);
5000         else
5001                 ov_out_of_sync_print(mdev);
5002
5003         if (!get_ldev(mdev))
5004                 return 0;
5005
5006         drbd_rs_complete_io(mdev, sector);
5007         dec_rs_pending(mdev);
5008
5009         --mdev->ov_left;
5010
5011         /* let's advance progress step marks only for every other megabyte */
5012         if ((mdev->ov_left & 0x200) == 0x200)
5013                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5014
5015         if (mdev->ov_left == 0) {
5016                 w = kmalloc(sizeof(*w), GFP_NOIO);
5017                 if (w) {
5018                         w->cb = w_ov_finished;
5019                         w->mdev = mdev;
5020                         drbd_queue_work_front(&mdev->tconn->data.work, w);
5021                 } else {
5022                         dev_err(DEV, "kmalloc(w) failed.");
5023                         ov_out_of_sync_print(mdev);
5024                         drbd_resync_finished(mdev);
5025                 }
5026         }
5027         put_ldev(mdev);
5028         return 0;
5029 }
5030
5031 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5032 {
5033         return 0;
5034 }
5035
5036 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5037 {
5038         struct drbd_conf *mdev;
5039         int vnr, not_empty = 0;
5040
5041         do {
5042                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5043                 flush_signals(current);
5044
5045                 rcu_read_lock();
5046                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5047                         kref_get(&mdev->kref);
5048                         rcu_read_unlock();
5049                         if (drbd_finish_peer_reqs(mdev)) {
5050                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5051                                 return 1;
5052                         }
5053                         kref_put(&mdev->kref, &drbd_minor_destroy);
5054                         rcu_read_lock();
5055                 }
5056                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5057
5058                 spin_lock_irq(&tconn->req_lock);
5059                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5060                         not_empty = !list_empty(&mdev->done_ee);
5061                         if (not_empty)
5062                                 break;
5063                 }
5064                 spin_unlock_irq(&tconn->req_lock);
5065                 rcu_read_unlock();
5066         } while (not_empty);
5067
5068         return 0;
5069 }
5070
5071 struct asender_cmd {
5072         size_t pkt_size;
5073         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5074 };
5075
5076 static struct asender_cmd asender_tbl[] = {
5077         [P_PING]            = { 0, got_Ping },
5078         [P_PING_ACK]        = { 0, got_PingAck },
5079         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5080         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5081         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5082         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
5083         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5084         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5085         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5086         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5087         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5088         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5089         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5090         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5091         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5092         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5093         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5094 };
5095
5096 int drbd_asender(struct drbd_thread *thi)
5097 {
5098         struct drbd_tconn *tconn = thi->tconn;
5099         struct asender_cmd *cmd = NULL;
5100         struct packet_info pi;
5101         int rv;
5102         void *buf    = tconn->meta.rbuf;
5103         int received = 0;
5104         unsigned int header_size = drbd_header_size(tconn);
5105         int expect   = header_size;
5106         bool ping_timeout_active = false;
5107         struct net_conf *nc;
5108         int ping_timeo, tcp_cork, ping_int;
5109
5110         current->policy = SCHED_RR;  /* Make this a realtime task! */
5111         current->rt_priority = 2;    /* more important than all other tasks */
5112
5113         while (get_t_state(thi) == RUNNING) {
5114                 drbd_thread_current_set_cpu(thi);
5115
5116                 rcu_read_lock();
5117                 nc = rcu_dereference(tconn->net_conf);
5118                 ping_timeo = nc->ping_timeo;
5119                 tcp_cork = nc->tcp_cork;
5120                 ping_int = nc->ping_int;
5121                 rcu_read_unlock();
5122
5123                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5124                         if (drbd_send_ping(tconn)) {
5125                                 conn_err(tconn, "drbd_send_ping has failed\n");
5126                                 goto reconnect;
5127                         }
5128                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5129                         ping_timeout_active = true;
5130                 }
5131
5132                 /* TODO: conditionally cork; it may hurt latency if we cork without
5133                    much to send */
5134                 if (tcp_cork)
5135                         drbd_tcp_cork(tconn->meta.socket);
5136                 if (tconn_finish_peer_reqs(tconn)) {
5137                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5138                         goto reconnect;
5139                 }
5140                 /* but unconditionally uncork unless disabled */
5141                 if (tcp_cork)
5142                         drbd_tcp_uncork(tconn->meta.socket);
5143
5144                 /* short circuit, recv_msg would return EINTR anyways. */
5145                 if (signal_pending(current))
5146                         continue;
5147
5148                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5149                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5150
5151                 flush_signals(current);
5152
5153                 /* Note:
5154                  * -EINTR        (on meta) we got a signal
5155                  * -EAGAIN       (on meta) rcvtimeo expired
5156                  * -ECONNRESET   other side closed the connection
5157                  * -ERESTARTSYS  (on data) we got a signal
5158                  * rv <  0       other than above: unexpected error!
5159                  * rv == expected: full header or command
5160                  * rv <  expected: "woken" by signal during receive
5161                  * rv == 0       : "connection shut down by peer"
5162                  */
5163                 if (likely(rv > 0)) {
5164                         received += rv;
5165                         buf      += rv;
5166                 } else if (rv == 0) {
5167                         conn_err(tconn, "meta connection shut down by peer.\n");
5168                         goto reconnect;
5169                 } else if (rv == -EAGAIN) {
5170                         /* If the data socket received something meanwhile,
5171                          * that is good enough: peer is still alive. */
5172                         if (time_after(tconn->last_received,
5173                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5174                                 continue;
5175                         if (ping_timeout_active) {
5176                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5177                                 goto reconnect;
5178                         }
5179                         set_bit(SEND_PING, &tconn->flags);
5180                         continue;
5181                 } else if (rv == -EINTR) {
5182                         continue;
5183                 } else {
5184                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5185                         goto reconnect;
5186                 }
5187
5188                 if (received == expect && cmd == NULL) {
5189                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5190                                 goto reconnect;
5191                         cmd = &asender_tbl[pi.cmd];
5192                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5193                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5194                                          cmdname(pi.cmd), pi.cmd);
5195                                 goto disconnect;
5196                         }
5197                         expect = header_size + cmd->pkt_size;
5198                         if (pi.size != expect - header_size) {
5199                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5200                                         pi.cmd, pi.size);
5201                                 goto reconnect;
5202                         }
5203                 }
5204                 if (received == expect) {
5205                         bool err;
5206
5207                         err = cmd->fn(tconn, &pi);
5208                         if (err) {
5209                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5210                                 goto reconnect;
5211                         }
5212
5213                         tconn->last_received = jiffies;
5214
5215                         if (cmd == &asender_tbl[P_PING_ACK]) {
5216                                 /* restore idle timeout */
5217                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5218                                 ping_timeout_active = false;
5219                         }
5220
5221                         buf      = tconn->meta.rbuf;
5222                         received = 0;
5223                         expect   = header_size;
5224                         cmd      = NULL;
5225                 }
5226         }
5227
5228         if (0) {
5229 reconnect:
5230                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5231         }
5232         if (0) {
5233 disconnect:
5234                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5235         }
5236         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5237
5238         conn_info(tconn, "asender terminated\n");
5239
5240         return 0;
5241 }