]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
drbd: Fixed processing of disk-barrier, disk-flushes and disk-drain
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490         __module_get((*newsock)->ops->owner);
491
492 out:
493         return err;
494 }
495
496 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
497 {
498         mm_segment_t oldfs;
499         struct kvec iov = {
500                 .iov_base = buf,
501                 .iov_len = size,
502         };
503         struct msghdr msg = {
504                 .msg_iovlen = 1,
505                 .msg_iov = (struct iovec *)&iov,
506                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
507         };
508         int rv;
509
510         oldfs = get_fs();
511         set_fs(KERNEL_DS);
512         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
513         set_fs(oldfs);
514
515         return rv;
516 }
517
518 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
519 {
520         mm_segment_t oldfs;
521         struct kvec iov = {
522                 .iov_base = buf,
523                 .iov_len = size,
524         };
525         struct msghdr msg = {
526                 .msg_iovlen = 1,
527                 .msg_iov = (struct iovec *)&iov,
528                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
529         };
530         int rv;
531
532         oldfs = get_fs();
533         set_fs(KERNEL_DS);
534
535         for (;;) {
536                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
537                 if (rv == size)
538                         break;
539
540                 /* Note:
541                  * ECONNRESET   other side closed the connection
542                  * ERESTARTSYS  (on  sock) we got a signal
543                  */
544
545                 if (rv < 0) {
546                         if (rv == -ECONNRESET)
547                                 conn_info(tconn, "sock was reset by peer\n");
548                         else if (rv != -ERESTARTSYS)
549                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
550                         break;
551                 } else if (rv == 0) {
552                         conn_info(tconn, "sock was shut down by peer\n");
553                         break;
554                 } else  {
555                         /* signal came in, or peer/link went down,
556                          * after we read a partial message
557                          */
558                         /* D_ASSERT(signal_pending(current)); */
559                         break;
560                 }
561         };
562
563         set_fs(oldfs);
564
565         if (rv != size)
566                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
567
568         return rv;
569 }
570
571 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
572 {
573         int err;
574
575         err = drbd_recv(tconn, buf, size);
576         if (err != size) {
577                 if (err >= 0)
578                         err = -EIO;
579         } else
580                 err = 0;
581         return err;
582 }
583
584 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
585 {
586         int err;
587
588         err = drbd_recv_all(tconn, buf, size);
589         if (err && !signal_pending(current))
590                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
591         return err;
592 }
593
594 /* quoting tcp(7):
595  *   On individual connections, the socket buffer size must be set prior to the
596  *   listen(2) or connect(2) calls in order to have it take effect.
597  * This is our wrapper to do so.
598  */
599 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
600                 unsigned int rcv)
601 {
602         /* open coded SO_SNDBUF, SO_RCVBUF */
603         if (snd) {
604                 sock->sk->sk_sndbuf = snd;
605                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
606         }
607         if (rcv) {
608                 sock->sk->sk_rcvbuf = rcv;
609                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
610         }
611 }
612
613 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
614 {
615         const char *what;
616         struct socket *sock;
617         struct sockaddr_in6 src_in6;
618         struct sockaddr_in6 peer_in6;
619         struct net_conf *nc;
620         int err, peer_addr_len, my_addr_len;
621         int sndbuf_size, rcvbuf_size, connect_int;
622         int disconnect_on_error = 1;
623
624         rcu_read_lock();
625         nc = rcu_dereference(tconn->net_conf);
626         if (!nc) {
627                 rcu_read_unlock();
628                 return NULL;
629         }
630         sndbuf_size = nc->sndbuf_size;
631         rcvbuf_size = nc->rcvbuf_size;
632         connect_int = nc->connect_int;
633         rcu_read_unlock();
634
635         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
636         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
637
638         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
639                 src_in6.sin6_port = 0;
640         else
641                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
642
643         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
644         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
645
646         what = "sock_create_kern";
647         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
648                                SOCK_STREAM, IPPROTO_TCP, &sock);
649         if (err < 0) {
650                 sock = NULL;
651                 goto out;
652         }
653
654         sock->sk->sk_rcvtimeo =
655         sock->sk->sk_sndtimeo = connect_int * HZ;
656         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
657
658        /* explicitly bind to the configured IP as source IP
659         *  for the outgoing connections.
660         *  This is needed for multihomed hosts and to be
661         *  able to use lo: interfaces for drbd.
662         * Make sure to use 0 as port number, so linux selects
663         *  a free one dynamically.
664         */
665         what = "bind before connect";
666         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
667         if (err < 0)
668                 goto out;
669
670         /* connect may fail, peer not yet available.
671          * stay C_WF_CONNECTION, don't go Disconnecting! */
672         disconnect_on_error = 0;
673         what = "connect";
674         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
675
676 out:
677         if (err < 0) {
678                 if (sock) {
679                         sock_release(sock);
680                         sock = NULL;
681                 }
682                 switch (-err) {
683                         /* timeout, busy, signal pending */
684                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
685                 case EINTR: case ERESTARTSYS:
686                         /* peer not (yet) available, network problem */
687                 case ECONNREFUSED: case ENETUNREACH:
688                 case EHOSTDOWN:    case EHOSTUNREACH:
689                         disconnect_on_error = 0;
690                         break;
691                 default:
692                         conn_err(tconn, "%s failed, err = %d\n", what, err);
693                 }
694                 if (disconnect_on_error)
695                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
696         }
697
698         return sock;
699 }
700
701 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
702 {
703         int timeo, err, my_addr_len;
704         int sndbuf_size, rcvbuf_size, connect_int;
705         struct socket *s_estab = NULL, *s_listen;
706         struct sockaddr_in6 my_addr;
707         struct net_conf *nc;
708         const char *what;
709
710         rcu_read_lock();
711         nc = rcu_dereference(tconn->net_conf);
712         if (!nc) {
713                 rcu_read_unlock();
714                 return NULL;
715         }
716         sndbuf_size = nc->sndbuf_size;
717         rcvbuf_size = nc->rcvbuf_size;
718         connect_int = nc->connect_int;
719         rcu_read_unlock();
720
721         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
722         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
723
724         what = "sock_create_kern";
725         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
726                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
727         if (err) {
728                 s_listen = NULL;
729                 goto out;
730         }
731
732         timeo = connect_int * HZ;
733         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
734
735         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
736         s_listen->sk->sk_rcvtimeo = timeo;
737         s_listen->sk->sk_sndtimeo = timeo;
738         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
739
740         what = "bind before listen";
741         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
742         if (err < 0)
743                 goto out;
744
745         err = drbd_accept(&what, s_listen, &s_estab);
746
747 out:
748         if (s_listen)
749                 sock_release(s_listen);
750         if (err < 0) {
751                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
752                         conn_err(tconn, "%s failed, err = %d\n", what, err);
753                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
754                 }
755         }
756
757         return s_estab;
758 }
759
760 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
761
762 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
763                              enum drbd_packet cmd)
764 {
765         if (!conn_prepare_command(tconn, sock))
766                 return -EIO;
767         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
768 }
769
770 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
771 {
772         unsigned int header_size = drbd_header_size(tconn);
773         struct packet_info pi;
774         int err;
775
776         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
777         if (err != header_size) {
778                 if (err >= 0)
779                         err = -EIO;
780                 return err;
781         }
782         err = decode_header(tconn, tconn->data.rbuf, &pi);
783         if (err)
784                 return err;
785         return pi.cmd;
786 }
787
788 /**
789  * drbd_socket_okay() - Free the socket if its connection is not okay
790  * @sock:       pointer to the pointer to the socket.
791  */
792 static int drbd_socket_okay(struct socket **sock)
793 {
794         int rr;
795         char tb[4];
796
797         if (!*sock)
798                 return false;
799
800         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
801
802         if (rr > 0 || rr == -EAGAIN) {
803                 return true;
804         } else {
805                 sock_release(*sock);
806                 *sock = NULL;
807                 return false;
808         }
809 }
810 /* Gets called if a connection is established, or if a new minor gets created
811    in a connection */
812 int drbd_connected(struct drbd_conf *mdev)
813 {
814         int err;
815
816         atomic_set(&mdev->packet_seq, 0);
817         mdev->peer_seq = 0;
818
819         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
820                 &mdev->tconn->cstate_mutex :
821                 &mdev->own_state_mutex;
822
823         err = drbd_send_sync_param(mdev);
824         if (!err)
825                 err = drbd_send_sizes(mdev, 0, 0);
826         if (!err)
827                 err = drbd_send_uuids(mdev);
828         if (!err)
829                 err = drbd_send_current_state(mdev);
830         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
831         clear_bit(RESIZE_PENDING, &mdev->flags);
832         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
833         return err;
834 }
835
836 /*
837  * return values:
838  *   1 yes, we have a valid connection
839  *   0 oops, did not work out, please try again
840  *  -1 peer talks different language,
841  *     no point in trying again, please go standalone.
842  *  -2 We do not have a network config...
843  */
844 static int conn_connect(struct drbd_tconn *tconn)
845 {
846         struct drbd_socket sock, msock;
847         struct drbd_conf *mdev;
848         struct net_conf *nc;
849         int vnr, timeout, try, h, ok;
850         bool discard_my_data;
851
852         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
853                 return -2;
854
855         mutex_init(&sock.mutex);
856         sock.sbuf = tconn->data.sbuf;
857         sock.rbuf = tconn->data.rbuf;
858         sock.socket = NULL;
859         mutex_init(&msock.mutex);
860         msock.sbuf = tconn->meta.sbuf;
861         msock.rbuf = tconn->meta.rbuf;
862         msock.socket = NULL;
863
864         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
865
866         /* Assume that the peer only understands protocol 80 until we know better.  */
867         tconn->agreed_pro_version = 80;
868
869         do {
870                 struct socket *s;
871
872                 for (try = 0;;) {
873                         /* 3 tries, this should take less than a second! */
874                         s = drbd_try_connect(tconn);
875                         if (s || ++try >= 3)
876                                 break;
877                         /* give the other side time to call bind() & listen() */
878                         schedule_timeout_interruptible(HZ / 10);
879                 }
880
881                 if (s) {
882                         if (!sock.socket) {
883                                 sock.socket = s;
884                                 send_first_packet(tconn, &sock, P_INITIAL_DATA);
885                         } else if (!msock.socket) {
886                                 msock.socket = s;
887                                 send_first_packet(tconn, &msock, P_INITIAL_META);
888                         } else {
889                                 conn_err(tconn, "Logic error in conn_connect()\n");
890                                 goto out_release_sockets;
891                         }
892                 }
893
894                 if (sock.socket && msock.socket) {
895                         rcu_read_lock();
896                         nc = rcu_dereference(tconn->net_conf);
897                         timeout = nc->ping_timeo * HZ / 10;
898                         rcu_read_unlock();
899                         schedule_timeout_interruptible(timeout);
900                         ok = drbd_socket_okay(&sock.socket);
901                         ok = drbd_socket_okay(&msock.socket) && ok;
902                         if (ok)
903                                 break;
904                 }
905
906 retry:
907                 s = drbd_wait_for_connect(tconn);
908                 if (s) {
909                         try = receive_first_packet(tconn, s);
910                         drbd_socket_okay(&sock.socket);
911                         drbd_socket_okay(&msock.socket);
912                         switch (try) {
913                         case P_INITIAL_DATA:
914                                 if (sock.socket) {
915                                         conn_warn(tconn, "initial packet S crossed\n");
916                                         sock_release(sock.socket);
917                                 }
918                                 sock.socket = s;
919                                 break;
920                         case P_INITIAL_META:
921                                 if (msock.socket) {
922                                         conn_warn(tconn, "initial packet M crossed\n");
923                                         sock_release(msock.socket);
924                                 }
925                                 msock.socket = s;
926                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
927                                 break;
928                         default:
929                                 conn_warn(tconn, "Error receiving initial packet\n");
930                                 sock_release(s);
931                                 if (random32() & 1)
932                                         goto retry;
933                         }
934                 }
935
936                 if (tconn->cstate <= C_DISCONNECTING)
937                         goto out_release_sockets;
938                 if (signal_pending(current)) {
939                         flush_signals(current);
940                         smp_rmb();
941                         if (get_t_state(&tconn->receiver) == EXITING)
942                                 goto out_release_sockets;
943                 }
944
945                 if (sock.socket && &msock.socket) {
946                         ok = drbd_socket_okay(&sock.socket);
947                         ok = drbd_socket_okay(&msock.socket) && ok;
948                         if (ok)
949                                 break;
950                 }
951         } while (1);
952
953         sock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
954         msock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
955
956         sock.socket->sk->sk_allocation = GFP_NOIO;
957         msock.socket->sk->sk_allocation = GFP_NOIO;
958
959         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
960         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
961
962         /* NOT YET ...
963          * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
964          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
965          * first set it to the P_CONNECTION_FEATURES timeout,
966          * which we set to 4x the configured ping_timeout. */
967         rcu_read_lock();
968         nc = rcu_dereference(tconn->net_conf);
969
970         sock.socket->sk->sk_sndtimeo =
971         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
972
973         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
974         timeout = nc->timeout * HZ / 10;
975         discard_my_data = nc->discard_my_data;
976         rcu_read_unlock();
977
978         msock.socket->sk->sk_sndtimeo = timeout;
979
980         /* we don't want delays.
981          * we use TCP_CORK where appropriate, though */
982         drbd_tcp_nodelay(sock.socket);
983         drbd_tcp_nodelay(msock.socket);
984
985         tconn->data.socket = sock.socket;
986         tconn->meta.socket = msock.socket;
987         tconn->last_received = jiffies;
988
989         h = drbd_do_features(tconn);
990         if (h <= 0)
991                 return h;
992
993         if (tconn->cram_hmac_tfm) {
994                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
995                 switch (drbd_do_auth(tconn)) {
996                 case -1:
997                         conn_err(tconn, "Authentication of peer failed\n");
998                         return -1;
999                 case 0:
1000                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
1001                         return 0;
1002                 }
1003         }
1004
1005         tconn->data.socket->sk->sk_sndtimeo = timeout;
1006         tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1007
1008         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1009                 return -1;
1010
1011         rcu_read_lock();
1012         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1013                 kref_get(&mdev->kref);
1014                 rcu_read_unlock();
1015
1016                 if (discard_my_data)
1017                         set_bit(DISCARD_MY_DATA, &mdev->flags);
1018                 else
1019                         clear_bit(DISCARD_MY_DATA, &mdev->flags);
1020
1021                 drbd_connected(mdev);
1022                 kref_put(&mdev->kref, &drbd_minor_destroy);
1023                 rcu_read_lock();
1024         }
1025         rcu_read_unlock();
1026
1027         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
1028                 return 0;
1029
1030         drbd_thread_start(&tconn->asender);
1031
1032         mutex_lock(&tconn->conf_update);
1033         /* The discard_my_data flag is a single-shot modifier to the next
1034          * connection attempt, the handshake of which is now well underway.
1035          * No need for rcu style copying of the whole struct
1036          * just to clear a single value. */
1037         tconn->net_conf->discard_my_data = 0;
1038         mutex_unlock(&tconn->conf_update);
1039
1040         return h;
1041
1042 out_release_sockets:
1043         if (sock.socket)
1044                 sock_release(sock.socket);
1045         if (msock.socket)
1046                 sock_release(msock.socket);
1047         return -1;
1048 }
1049
1050 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1051 {
1052         unsigned int header_size = drbd_header_size(tconn);
1053
1054         if (header_size == sizeof(struct p_header100) &&
1055             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1056                 struct p_header100 *h = header;
1057                 if (h->pad != 0) {
1058                         conn_err(tconn, "Header padding is not zero\n");
1059                         return -EINVAL;
1060                 }
1061                 pi->vnr = be16_to_cpu(h->volume);
1062                 pi->cmd = be16_to_cpu(h->command);
1063                 pi->size = be32_to_cpu(h->length);
1064         } else if (header_size == sizeof(struct p_header95) &&
1065                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1066                 struct p_header95 *h = header;
1067                 pi->cmd = be16_to_cpu(h->command);
1068                 pi->size = be32_to_cpu(h->length);
1069                 pi->vnr = 0;
1070         } else if (header_size == sizeof(struct p_header80) &&
1071                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1072                 struct p_header80 *h = header;
1073                 pi->cmd = be16_to_cpu(h->command);
1074                 pi->size = be16_to_cpu(h->length);
1075                 pi->vnr = 0;
1076         } else {
1077                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1078                          be32_to_cpu(*(__be32 *)header),
1079                          tconn->agreed_pro_version);
1080                 return -EINVAL;
1081         }
1082         pi->data = header + header_size;
1083         return 0;
1084 }
1085
1086 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1087 {
1088         void *buffer = tconn->data.rbuf;
1089         int err;
1090
1091         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1092         if (err)
1093                 return err;
1094
1095         err = decode_header(tconn, buffer, pi);
1096         tconn->last_received = jiffies;
1097
1098         return err;
1099 }
1100
1101 static void drbd_flush(struct drbd_tconn *tconn)
1102 {
1103         int rv;
1104         struct drbd_conf *mdev;
1105         int vnr;
1106
1107         if (tconn->write_ordering >= WO_bdev_flush) {
1108                 rcu_read_lock();
1109                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1110                         if (!get_ldev(mdev))
1111                                 continue;
1112                         kref_get(&mdev->kref);
1113                         rcu_read_unlock();
1114
1115                         rv = blkdev_issue_flush(mdev->ldev->backing_bdev,
1116                                         GFP_NOIO, NULL);
1117                         if (rv) {
1118                                 dev_info(DEV, "local disk flush failed with status %d\n", rv);
1119                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1120                                  * don't try again for ANY return value != 0
1121                                  * if (rv == -EOPNOTSUPP) */
1122                                 drbd_bump_write_ordering(tconn, WO_drain_io);
1123                         }
1124                         put_ldev(mdev);
1125                         kref_put(&mdev->kref, &drbd_minor_destroy);
1126
1127                         rcu_read_lock();
1128                         if (rv)
1129                                 break;
1130                 }
1131                 rcu_read_unlock();
1132         }
1133 }
1134
1135 /**
1136  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1137  * @mdev:       DRBD device.
1138  * @epoch:      Epoch object.
1139  * @ev:         Epoch event.
1140  */
1141 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1142                                                struct drbd_epoch *epoch,
1143                                                enum epoch_event ev)
1144 {
1145         int epoch_size;
1146         struct drbd_epoch *next_epoch;
1147         enum finish_epoch rv = FE_STILL_LIVE;
1148
1149         spin_lock(&tconn->epoch_lock);
1150         do {
1151                 next_epoch = NULL;
1152
1153                 epoch_size = atomic_read(&epoch->epoch_size);
1154
1155                 switch (ev & ~EV_CLEANUP) {
1156                 case EV_PUT:
1157                         atomic_dec(&epoch->active);
1158                         break;
1159                 case EV_GOT_BARRIER_NR:
1160                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1161                         break;
1162                 case EV_BECAME_LAST:
1163                         /* nothing to do*/
1164                         break;
1165                 }
1166
1167                 if (epoch_size != 0 &&
1168                     atomic_read(&epoch->active) == 0 &&
1169                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1170                         if (!(ev & EV_CLEANUP)) {
1171                                 spin_unlock(&tconn->epoch_lock);
1172                                 drbd_send_b_ack(epoch->tconn, epoch->barrier_nr, epoch_size);
1173                                 spin_lock(&tconn->epoch_lock);
1174                         }
1175 #if 0
1176                         /* FIXME: dec unacked on connection, once we have
1177                          * something to count pending connection packets in. */
1178                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1179                                 dec_unacked(epoch->tconn);
1180 #endif
1181
1182                         if (tconn->current_epoch != epoch) {
1183                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1184                                 list_del(&epoch->list);
1185                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1186                                 tconn->epochs--;
1187                                 kfree(epoch);
1188
1189                                 if (rv == FE_STILL_LIVE)
1190                                         rv = FE_DESTROYED;
1191                         } else {
1192                                 epoch->flags = 0;
1193                                 atomic_set(&epoch->epoch_size, 0);
1194                                 /* atomic_set(&epoch->active, 0); is already zero */
1195                                 if (rv == FE_STILL_LIVE)
1196                                         rv = FE_RECYCLED;
1197                         }
1198                 }
1199
1200                 if (!next_epoch)
1201                         break;
1202
1203                 epoch = next_epoch;
1204         } while (1);
1205
1206         spin_unlock(&tconn->epoch_lock);
1207
1208         return rv;
1209 }
1210
1211 /**
1212  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1213  * @tconn:      DRBD connection.
1214  * @wo:         Write ordering method to try.
1215  */
1216 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1217 {
1218         struct disk_conf *dc;
1219         struct drbd_conf *mdev;
1220         enum write_ordering_e pwo;
1221         int vnr;
1222         static char *write_ordering_str[] = {
1223                 [WO_none] = "none",
1224                 [WO_drain_io] = "drain",
1225                 [WO_bdev_flush] = "flush",
1226         };
1227
1228         pwo = tconn->write_ordering;
1229         wo = min(pwo, wo);
1230         rcu_read_lock();
1231         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1232                 if (!get_ldev_if_state(mdev, D_ATTACHING))
1233                         continue;
1234                 dc = rcu_dereference(mdev->ldev->disk_conf);
1235
1236                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1237                         wo = WO_drain_io;
1238                 if (wo == WO_drain_io && !dc->disk_drain)
1239                         wo = WO_none;
1240                 put_ldev(mdev);
1241         }
1242         rcu_read_unlock();
1243         tconn->write_ordering = wo;
1244         if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1245                 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1246 }
1247
1248 /**
1249  * drbd_submit_peer_request()
1250  * @mdev:       DRBD device.
1251  * @peer_req:   peer request
1252  * @rw:         flag field, see bio->bi_rw
1253  *
1254  * May spread the pages to multiple bios,
1255  * depending on bio_add_page restrictions.
1256  *
1257  * Returns 0 if all bios have been submitted,
1258  * -ENOMEM if we could not allocate enough bios,
1259  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1260  *  single page to an empty bio (which should never happen and likely indicates
1261  *  that the lower level IO stack is in some way broken). This has been observed
1262  *  on certain Xen deployments.
1263  */
1264 /* TODO allocate from our own bio_set. */
1265 int drbd_submit_peer_request(struct drbd_conf *mdev,
1266                              struct drbd_peer_request *peer_req,
1267                              const unsigned rw, const int fault_type)
1268 {
1269         struct bio *bios = NULL;
1270         struct bio *bio;
1271         struct page *page = peer_req->pages;
1272         sector_t sector = peer_req->i.sector;
1273         unsigned ds = peer_req->i.size;
1274         unsigned n_bios = 0;
1275         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1276         int err = -ENOMEM;
1277
1278         /* In most cases, we will only need one bio.  But in case the lower
1279          * level restrictions happen to be different at this offset on this
1280          * side than those of the sending peer, we may need to submit the
1281          * request in more than one bio.
1282          *
1283          * Plain bio_alloc is good enough here, this is no DRBD internally
1284          * generated bio, but a bio allocated on behalf of the peer.
1285          */
1286 next_bio:
1287         bio = bio_alloc(GFP_NOIO, nr_pages);
1288         if (!bio) {
1289                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1290                 goto fail;
1291         }
1292         /* > peer_req->i.sector, unless this is the first bio */
1293         bio->bi_sector = sector;
1294         bio->bi_bdev = mdev->ldev->backing_bdev;
1295         bio->bi_rw = rw;
1296         bio->bi_private = peer_req;
1297         bio->bi_end_io = drbd_peer_request_endio;
1298
1299         bio->bi_next = bios;
1300         bios = bio;
1301         ++n_bios;
1302
1303         page_chain_for_each(page) {
1304                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1305                 if (!bio_add_page(bio, page, len, 0)) {
1306                         /* A single page must always be possible!
1307                          * But in case it fails anyways,
1308                          * we deal with it, and complain (below). */
1309                         if (bio->bi_vcnt == 0) {
1310                                 dev_err(DEV,
1311                                         "bio_add_page failed for len=%u, "
1312                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1313                                         len, (unsigned long long)bio->bi_sector);
1314                                 err = -ENOSPC;
1315                                 goto fail;
1316                         }
1317                         goto next_bio;
1318                 }
1319                 ds -= len;
1320                 sector += len >> 9;
1321                 --nr_pages;
1322         }
1323         D_ASSERT(page == NULL);
1324         D_ASSERT(ds == 0);
1325
1326         atomic_set(&peer_req->pending_bios, n_bios);
1327         do {
1328                 bio = bios;
1329                 bios = bios->bi_next;
1330                 bio->bi_next = NULL;
1331
1332                 drbd_generic_make_request(mdev, fault_type, bio);
1333         } while (bios);
1334         return 0;
1335
1336 fail:
1337         while (bios) {
1338                 bio = bios;
1339                 bios = bios->bi_next;
1340                 bio_put(bio);
1341         }
1342         return err;
1343 }
1344
1345 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1346                                              struct drbd_peer_request *peer_req)
1347 {
1348         struct drbd_interval *i = &peer_req->i;
1349
1350         drbd_remove_interval(&mdev->write_requests, i);
1351         drbd_clear_interval(i);
1352
1353         /* Wake up any processes waiting for this peer request to complete.  */
1354         if (i->waiting)
1355                 wake_up(&mdev->misc_wait);
1356 }
1357
1358 void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1359 {
1360         struct drbd_conf *mdev;
1361         int vnr;
1362
1363         rcu_read_lock();
1364         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1365                 kref_get(&mdev->kref);
1366                 rcu_read_unlock();
1367                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1368                 kref_put(&mdev->kref, &drbd_minor_destroy);
1369                 rcu_read_lock();
1370         }
1371         rcu_read_unlock();
1372 }
1373
1374 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1375 {
1376         int rv;
1377         struct p_barrier *p = pi->data;
1378         struct drbd_epoch *epoch;
1379
1380         /* FIXME these are unacked on connection,
1381          * not a specific (peer)device.
1382          */
1383         tconn->current_epoch->barrier_nr = p->barrier;
1384         tconn->current_epoch->tconn = tconn;
1385         rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1386
1387         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1388          * the activity log, which means it would not be resynced in case the
1389          * R_PRIMARY crashes now.
1390          * Therefore we must send the barrier_ack after the barrier request was
1391          * completed. */
1392         switch (tconn->write_ordering) {
1393         case WO_none:
1394                 if (rv == FE_RECYCLED)
1395                         return 0;
1396
1397                 /* receiver context, in the writeout path of the other node.
1398                  * avoid potential distributed deadlock */
1399                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1400                 if (epoch)
1401                         break;
1402                 else
1403                         conn_warn(tconn, "Allocation of an epoch failed, slowing down\n");
1404                         /* Fall through */
1405
1406         case WO_bdev_flush:
1407         case WO_drain_io:
1408                 conn_wait_active_ee_empty(tconn);
1409                 drbd_flush(tconn);
1410
1411                 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1412                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1413                         if (epoch)
1414                                 break;
1415                 }
1416
1417                 return 0;
1418         default:
1419                 conn_err(tconn, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1420                 return -EIO;
1421         }
1422
1423         epoch->flags = 0;
1424         atomic_set(&epoch->epoch_size, 0);
1425         atomic_set(&epoch->active, 0);
1426
1427         spin_lock(&tconn->epoch_lock);
1428         if (atomic_read(&tconn->current_epoch->epoch_size)) {
1429                 list_add(&epoch->list, &tconn->current_epoch->list);
1430                 tconn->current_epoch = epoch;
1431                 tconn->epochs++;
1432         } else {
1433                 /* The current_epoch got recycled while we allocated this one... */
1434                 kfree(epoch);
1435         }
1436         spin_unlock(&tconn->epoch_lock);
1437
1438         return 0;
1439 }
1440
1441 /* used from receive_RSDataReply (recv_resync_read)
1442  * and from receive_Data */
1443 static struct drbd_peer_request *
1444 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1445               int data_size) __must_hold(local)
1446 {
1447         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1448         struct drbd_peer_request *peer_req;
1449         struct page *page;
1450         int dgs, ds, err;
1451         void *dig_in = mdev->tconn->int_dig_in;
1452         void *dig_vv = mdev->tconn->int_dig_vv;
1453         unsigned long *data;
1454
1455         dgs = 0;
1456         if (mdev->tconn->peer_integrity_tfm) {
1457                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1458                 /*
1459                  * FIXME: Receive the incoming digest into the receive buffer
1460                  *        here, together with its struct p_data?
1461                  */
1462                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1463                 if (err)
1464                         return NULL;
1465                 data_size -= dgs;
1466         }
1467
1468         if (!expect(data_size != 0))
1469                 return NULL;
1470         if (!expect(IS_ALIGNED(data_size, 512)))
1471                 return NULL;
1472         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1473                 return NULL;
1474
1475         /* even though we trust out peer,
1476          * we sometimes have to double check. */
1477         if (sector + (data_size>>9) > capacity) {
1478                 dev_err(DEV, "request from peer beyond end of local disk: "
1479                         "capacity: %llus < sector: %llus + size: %u\n",
1480                         (unsigned long long)capacity,
1481                         (unsigned long long)sector, data_size);
1482                 return NULL;
1483         }
1484
1485         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1486          * "criss-cross" setup, that might cause write-out on some other DRBD,
1487          * which in turn might block on the other node at this very place.  */
1488         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1489         if (!peer_req)
1490                 return NULL;
1491
1492         ds = data_size;
1493         page = peer_req->pages;
1494         page_chain_for_each(page) {
1495                 unsigned len = min_t(int, ds, PAGE_SIZE);
1496                 data = kmap(page);
1497                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1498                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1499                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1500                         data[0] = data[0] ^ (unsigned long)-1;
1501                 }
1502                 kunmap(page);
1503                 if (err) {
1504                         drbd_free_peer_req(mdev, peer_req);
1505                         return NULL;
1506                 }
1507                 ds -= len;
1508         }
1509
1510         if (dgs) {
1511                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1512                 if (memcmp(dig_in, dig_vv, dgs)) {
1513                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1514                                 (unsigned long long)sector, data_size);
1515                         drbd_free_peer_req(mdev, peer_req);
1516                         return NULL;
1517                 }
1518         }
1519         mdev->recv_cnt += data_size>>9;
1520         return peer_req;
1521 }
1522
1523 /* drbd_drain_block() just takes a data block
1524  * out of the socket input buffer, and discards it.
1525  */
1526 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1527 {
1528         struct page *page;
1529         int err = 0;
1530         void *data;
1531
1532         if (!data_size)
1533                 return 0;
1534
1535         page = drbd_alloc_pages(mdev, 1, 1);
1536
1537         data = kmap(page);
1538         while (data_size) {
1539                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1540
1541                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1542                 if (err)
1543                         break;
1544                 data_size -= len;
1545         }
1546         kunmap(page);
1547         drbd_free_pages(mdev, page, 0);
1548         return err;
1549 }
1550
1551 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1552                            sector_t sector, int data_size)
1553 {
1554         struct bio_vec *bvec;
1555         struct bio *bio;
1556         int dgs, err, i, expect;
1557         void *dig_in = mdev->tconn->int_dig_in;
1558         void *dig_vv = mdev->tconn->int_dig_vv;
1559
1560         dgs = 0;
1561         if (mdev->tconn->peer_integrity_tfm) {
1562                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1563                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1564                 if (err)
1565                         return err;
1566                 data_size -= dgs;
1567         }
1568
1569         /* optimistically update recv_cnt.  if receiving fails below,
1570          * we disconnect anyways, and counters will be reset. */
1571         mdev->recv_cnt += data_size>>9;
1572
1573         bio = req->master_bio;
1574         D_ASSERT(sector == bio->bi_sector);
1575
1576         bio_for_each_segment(bvec, bio, i) {
1577                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1578                 expect = min_t(int, data_size, bvec->bv_len);
1579                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1580                 kunmap(bvec->bv_page);
1581                 if (err)
1582                         return err;
1583                 data_size -= expect;
1584         }
1585
1586         if (dgs) {
1587                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1588                 if (memcmp(dig_in, dig_vv, dgs)) {
1589                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1590                         return -EINVAL;
1591                 }
1592         }
1593
1594         D_ASSERT(data_size == 0);
1595         return 0;
1596 }
1597
1598 /*
1599  * e_end_resync_block() is called in asender context via
1600  * drbd_finish_peer_reqs().
1601  */
1602 static int e_end_resync_block(struct drbd_work *w, int unused)
1603 {
1604         struct drbd_peer_request *peer_req =
1605                 container_of(w, struct drbd_peer_request, w);
1606         struct drbd_conf *mdev = w->mdev;
1607         sector_t sector = peer_req->i.sector;
1608         int err;
1609
1610         D_ASSERT(drbd_interval_empty(&peer_req->i));
1611
1612         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1613                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1614                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1615         } else {
1616                 /* Record failure to sync */
1617                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1618
1619                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1620         }
1621         dec_unacked(mdev);
1622
1623         return err;
1624 }
1625
1626 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1627 {
1628         struct drbd_peer_request *peer_req;
1629
1630         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1631         if (!peer_req)
1632                 goto fail;
1633
1634         dec_rs_pending(mdev);
1635
1636         inc_unacked(mdev);
1637         /* corresponding dec_unacked() in e_end_resync_block()
1638          * respective _drbd_clear_done_ee */
1639
1640         peer_req->w.cb = e_end_resync_block;
1641
1642         spin_lock_irq(&mdev->tconn->req_lock);
1643         list_add(&peer_req->w.list, &mdev->sync_ee);
1644         spin_unlock_irq(&mdev->tconn->req_lock);
1645
1646         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1647         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1648                 return 0;
1649
1650         /* don't care for the reason here */
1651         dev_err(DEV, "submit failed, triggering re-connect\n");
1652         spin_lock_irq(&mdev->tconn->req_lock);
1653         list_del(&peer_req->w.list);
1654         spin_unlock_irq(&mdev->tconn->req_lock);
1655
1656         drbd_free_peer_req(mdev, peer_req);
1657 fail:
1658         put_ldev(mdev);
1659         return -EIO;
1660 }
1661
1662 static struct drbd_request *
1663 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1664              sector_t sector, bool missing_ok, const char *func)
1665 {
1666         struct drbd_request *req;
1667
1668         /* Request object according to our peer */
1669         req = (struct drbd_request *)(unsigned long)id;
1670         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1671                 return req;
1672         if (!missing_ok) {
1673                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1674                         (unsigned long)id, (unsigned long long)sector);
1675         }
1676         return NULL;
1677 }
1678
1679 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1680 {
1681         struct drbd_conf *mdev;
1682         struct drbd_request *req;
1683         sector_t sector;
1684         int err;
1685         struct p_data *p = pi->data;
1686
1687         mdev = vnr_to_mdev(tconn, pi->vnr);
1688         if (!mdev)
1689                 return -EIO;
1690
1691         sector = be64_to_cpu(p->sector);
1692
1693         spin_lock_irq(&mdev->tconn->req_lock);
1694         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1695         spin_unlock_irq(&mdev->tconn->req_lock);
1696         if (unlikely(!req))
1697                 return -EIO;
1698
1699         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1700          * special casing it there for the various failure cases.
1701          * still no race with drbd_fail_pending_reads */
1702         err = recv_dless_read(mdev, req, sector, pi->size);
1703         if (!err)
1704                 req_mod(req, DATA_RECEIVED);
1705         /* else: nothing. handled from drbd_disconnect...
1706          * I don't think we may complete this just yet
1707          * in case we are "on-disconnect: freeze" */
1708
1709         return err;
1710 }
1711
1712 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1713 {
1714         struct drbd_conf *mdev;
1715         sector_t sector;
1716         int err;
1717         struct p_data *p = pi->data;
1718
1719         mdev = vnr_to_mdev(tconn, pi->vnr);
1720         if (!mdev)
1721                 return -EIO;
1722
1723         sector = be64_to_cpu(p->sector);
1724         D_ASSERT(p->block_id == ID_SYNCER);
1725
1726         if (get_ldev(mdev)) {
1727                 /* data is submitted to disk within recv_resync_read.
1728                  * corresponding put_ldev done below on error,
1729                  * or in drbd_peer_request_endio. */
1730                 err = recv_resync_read(mdev, sector, pi->size);
1731         } else {
1732                 if (__ratelimit(&drbd_ratelimit_state))
1733                         dev_err(DEV, "Can not write resync data to local disk.\n");
1734
1735                 err = drbd_drain_block(mdev, pi->size);
1736
1737                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1738         }
1739
1740         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1741
1742         return err;
1743 }
1744
1745 static void restart_conflicting_writes(struct drbd_conf *mdev,
1746                                        sector_t sector, int size)
1747 {
1748         struct drbd_interval *i;
1749         struct drbd_request *req;
1750
1751         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1752                 if (!i->local)
1753                         continue;
1754                 req = container_of(i, struct drbd_request, i);
1755                 if (req->rq_state & RQ_LOCAL_PENDING ||
1756                     !(req->rq_state & RQ_POSTPONED))
1757                         continue;
1758                 /* as it is RQ_POSTPONED, this will cause it to
1759                  * be queued on the retry workqueue. */
1760                 __req_mod(req, DISCARD_WRITE, NULL);
1761         }
1762 }
1763
1764 /*
1765  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1766  */
1767 static int e_end_block(struct drbd_work *w, int cancel)
1768 {
1769         struct drbd_peer_request *peer_req =
1770                 container_of(w, struct drbd_peer_request, w);
1771         struct drbd_conf *mdev = w->mdev;
1772         sector_t sector = peer_req->i.sector;
1773         int err = 0, pcmd;
1774
1775         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1776                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1777                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1778                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1779                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1780                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1781                         err = drbd_send_ack(mdev, pcmd, peer_req);
1782                         if (pcmd == P_RS_WRITE_ACK)
1783                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1784                 } else {
1785                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1786                         /* we expect it to be marked out of sync anyways...
1787                          * maybe assert this?  */
1788                 }
1789                 dec_unacked(mdev);
1790         }
1791         /* we delete from the conflict detection hash _after_ we sent out the
1792          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1793         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1794                 spin_lock_irq(&mdev->tconn->req_lock);
1795                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1796                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1797                 if (peer_req->flags & EE_RESTART_REQUESTS)
1798                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1799                 spin_unlock_irq(&mdev->tconn->req_lock);
1800         } else
1801                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1802
1803         drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1804
1805         return err;
1806 }
1807
1808 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1809 {
1810         struct drbd_conf *mdev = w->mdev;
1811         struct drbd_peer_request *peer_req =
1812                 container_of(w, struct drbd_peer_request, w);
1813         int err;
1814
1815         err = drbd_send_ack(mdev, ack, peer_req);
1816         dec_unacked(mdev);
1817
1818         return err;
1819 }
1820
1821 static int e_send_discard_write(struct drbd_work *w, int unused)
1822 {
1823         return e_send_ack(w, P_DISCARD_WRITE);
1824 }
1825
1826 static int e_send_retry_write(struct drbd_work *w, int unused)
1827 {
1828         struct drbd_tconn *tconn = w->mdev->tconn;
1829
1830         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1831                              P_RETRY_WRITE : P_DISCARD_WRITE);
1832 }
1833
1834 static bool seq_greater(u32 a, u32 b)
1835 {
1836         /*
1837          * We assume 32-bit wrap-around here.
1838          * For 24-bit wrap-around, we would have to shift:
1839          *  a <<= 8; b <<= 8;
1840          */
1841         return (s32)a - (s32)b > 0;
1842 }
1843
1844 static u32 seq_max(u32 a, u32 b)
1845 {
1846         return seq_greater(a, b) ? a : b;
1847 }
1848
1849 static bool need_peer_seq(struct drbd_conf *mdev)
1850 {
1851         struct drbd_tconn *tconn = mdev->tconn;
1852         int tp;
1853
1854         /*
1855          * We only need to keep track of the last packet_seq number of our peer
1856          * if we are in dual-primary mode and we have the discard flag set; see
1857          * handle_write_conflicts().
1858          */
1859
1860         rcu_read_lock();
1861         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1862         rcu_read_unlock();
1863
1864         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1865 }
1866
1867 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1868 {
1869         unsigned int newest_peer_seq;
1870
1871         if (need_peer_seq(mdev)) {
1872                 spin_lock(&mdev->peer_seq_lock);
1873                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1874                 mdev->peer_seq = newest_peer_seq;
1875                 spin_unlock(&mdev->peer_seq_lock);
1876                 /* wake up only if we actually changed mdev->peer_seq */
1877                 if (peer_seq == newest_peer_seq)
1878                         wake_up(&mdev->seq_wait);
1879         }
1880 }
1881
1882 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1883 {
1884         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1885 }
1886
1887 /* maybe change sync_ee into interval trees as well? */
1888 static bool overlaping_resync_write(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
1889 {
1890         struct drbd_peer_request *rs_req;
1891         bool rv = 0;
1892
1893         spin_lock_irq(&mdev->tconn->req_lock);
1894         list_for_each_entry(rs_req, &mdev->sync_ee, w.list) {
1895                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1896                              rs_req->i.sector, rs_req->i.size)) {
1897                         rv = 1;
1898                         break;
1899                 }
1900         }
1901         spin_unlock_irq(&mdev->tconn->req_lock);
1902
1903         if (rv)
1904                 dev_warn(DEV, "WARN: Avoiding concurrent data/resync write to single sector.\n");
1905
1906         return rv;
1907 }
1908
1909 /* Called from receive_Data.
1910  * Synchronize packets on sock with packets on msock.
1911  *
1912  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1913  * packet traveling on msock, they are still processed in the order they have
1914  * been sent.
1915  *
1916  * Note: we don't care for Ack packets overtaking P_DATA packets.
1917  *
1918  * In case packet_seq is larger than mdev->peer_seq number, there are
1919  * outstanding packets on the msock. We wait for them to arrive.
1920  * In case we are the logically next packet, we update mdev->peer_seq
1921  * ourselves. Correctly handles 32bit wrap around.
1922  *
1923  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1924  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1925  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1926  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1927  *
1928  * returns 0 if we may process the packet,
1929  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1930 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1931 {
1932         DEFINE_WAIT(wait);
1933         long timeout;
1934         int ret;
1935
1936         if (!need_peer_seq(mdev))
1937                 return 0;
1938
1939         spin_lock(&mdev->peer_seq_lock);
1940         for (;;) {
1941                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1942                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1943                         ret = 0;
1944                         break;
1945                 }
1946                 if (signal_pending(current)) {
1947                         ret = -ERESTARTSYS;
1948                         break;
1949                 }
1950                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1951                 spin_unlock(&mdev->peer_seq_lock);
1952                 rcu_read_lock();
1953                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1954                 rcu_read_unlock();
1955                 timeout = schedule_timeout(timeout);
1956                 spin_lock(&mdev->peer_seq_lock);
1957                 if (!timeout) {
1958                         ret = -ETIMEDOUT;
1959                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1960                         break;
1961                 }
1962         }
1963         spin_unlock(&mdev->peer_seq_lock);
1964         finish_wait(&mdev->seq_wait, &wait);
1965         return ret;
1966 }
1967
1968 /* see also bio_flags_to_wire()
1969  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1970  * flags and back. We may replicate to other kernel versions. */
1971 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1972 {
1973         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1974                 (dpf & DP_FUA ? REQ_FUA : 0) |
1975                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1976                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1977 }
1978
1979 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1980                                     unsigned int size)
1981 {
1982         struct drbd_interval *i;
1983
1984     repeat:
1985         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1986                 struct drbd_request *req;
1987                 struct bio_and_error m;
1988
1989                 if (!i->local)
1990                         continue;
1991                 req = container_of(i, struct drbd_request, i);
1992                 if (!(req->rq_state & RQ_POSTPONED))
1993                         continue;
1994                 req->rq_state &= ~RQ_POSTPONED;
1995                 __req_mod(req, NEG_ACKED, &m);
1996                 spin_unlock_irq(&mdev->tconn->req_lock);
1997                 if (m.bio)
1998                         complete_master_bio(mdev, &m);
1999                 spin_lock_irq(&mdev->tconn->req_lock);
2000                 goto repeat;
2001         }
2002 }
2003
2004 static int handle_write_conflicts(struct drbd_conf *mdev,
2005                                   struct drbd_peer_request *peer_req)
2006 {
2007         struct drbd_tconn *tconn = mdev->tconn;
2008         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
2009         sector_t sector = peer_req->i.sector;
2010         const unsigned int size = peer_req->i.size;
2011         struct drbd_interval *i;
2012         bool equal;
2013         int err;
2014
2015         /*
2016          * Inserting the peer request into the write_requests tree will prevent
2017          * new conflicting local requests from being added.
2018          */
2019         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
2020
2021     repeat:
2022         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2023                 if (i == &peer_req->i)
2024                         continue;
2025
2026                 if (!i->local) {
2027                         /*
2028                          * Our peer has sent a conflicting remote request; this
2029                          * should not happen in a two-node setup.  Wait for the
2030                          * earlier peer request to complete.
2031                          */
2032                         err = drbd_wait_misc(mdev, i);
2033                         if (err)
2034                                 goto out;
2035                         goto repeat;
2036                 }
2037
2038                 equal = i->sector == sector && i->size == size;
2039                 if (resolve_conflicts) {
2040                         /*
2041                          * If the peer request is fully contained within the
2042                          * overlapping request, it can be discarded; otherwise,
2043                          * it will be retried once all overlapping requests
2044                          * have completed.
2045                          */
2046                         bool discard = i->sector <= sector && i->sector +
2047                                        (i->size >> 9) >= sector + (size >> 9);
2048
2049                         if (!equal)
2050                                 dev_alert(DEV, "Concurrent writes detected: "
2051                                                "local=%llus +%u, remote=%llus +%u, "
2052                                                "assuming %s came first\n",
2053                                           (unsigned long long)i->sector, i->size,
2054                                           (unsigned long long)sector, size,
2055                                           discard ? "local" : "remote");
2056
2057                         inc_unacked(mdev);
2058                         peer_req->w.cb = discard ? e_send_discard_write :
2059                                                    e_send_retry_write;
2060                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2061                         wake_asender(mdev->tconn);
2062
2063                         err = -ENOENT;
2064                         goto out;
2065                 } else {
2066                         struct drbd_request *req =
2067                                 container_of(i, struct drbd_request, i);
2068
2069                         if (!equal)
2070                                 dev_alert(DEV, "Concurrent writes detected: "
2071                                                "local=%llus +%u, remote=%llus +%u\n",
2072                                           (unsigned long long)i->sector, i->size,
2073                                           (unsigned long long)sector, size);
2074
2075                         if (req->rq_state & RQ_LOCAL_PENDING ||
2076                             !(req->rq_state & RQ_POSTPONED)) {
2077                                 /*
2078                                  * Wait for the node with the discard flag to
2079                                  * decide if this request will be discarded or
2080                                  * retried.  Requests that are discarded will
2081                                  * disappear from the write_requests tree.
2082                                  *
2083                                  * In addition, wait for the conflicting
2084                                  * request to finish locally before submitting
2085                                  * the conflicting peer request.
2086                                  */
2087                                 err = drbd_wait_misc(mdev, &req->i);
2088                                 if (err) {
2089                                         _conn_request_state(mdev->tconn,
2090                                                             NS(conn, C_TIMEOUT),
2091                                                             CS_HARD);
2092                                         fail_postponed_requests(mdev, sector, size);
2093                                         goto out;
2094                                 }
2095                                 goto repeat;
2096                         }
2097                         /*
2098                          * Remember to restart the conflicting requests after
2099                          * the new peer request has completed.
2100                          */
2101                         peer_req->flags |= EE_RESTART_REQUESTS;
2102                 }
2103         }
2104         err = 0;
2105
2106     out:
2107         if (err)
2108                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2109         return err;
2110 }
2111
2112 /* mirrored write */
2113 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2114 {
2115         struct drbd_conf *mdev;
2116         sector_t sector;
2117         struct drbd_peer_request *peer_req;
2118         struct p_data *p = pi->data;
2119         u32 peer_seq = be32_to_cpu(p->seq_num);
2120         int rw = WRITE;
2121         u32 dp_flags;
2122         int err, tp;
2123
2124         mdev = vnr_to_mdev(tconn, pi->vnr);
2125         if (!mdev)
2126                 return -EIO;
2127
2128         if (!get_ldev(mdev)) {
2129                 int err2;
2130
2131                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2132                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2133                 atomic_inc(&tconn->current_epoch->epoch_size);
2134                 err2 = drbd_drain_block(mdev, pi->size);
2135                 if (!err)
2136                         err = err2;
2137                 return err;
2138         }
2139
2140         /*
2141          * Corresponding put_ldev done either below (on various errors), or in
2142          * drbd_peer_request_endio, if we successfully submit the data at the
2143          * end of this function.
2144          */
2145
2146         sector = be64_to_cpu(p->sector);
2147         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2148         if (!peer_req) {
2149                 put_ldev(mdev);
2150                 return -EIO;
2151         }
2152
2153         peer_req->w.cb = e_end_block;
2154
2155         dp_flags = be32_to_cpu(p->dp_flags);
2156         rw |= wire_flags_to_bio(mdev, dp_flags);
2157
2158         if (dp_flags & DP_MAY_SET_IN_SYNC)
2159                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2160
2161         spin_lock(&tconn->epoch_lock);
2162         peer_req->epoch = tconn->current_epoch;
2163         atomic_inc(&peer_req->epoch->epoch_size);
2164         atomic_inc(&peer_req->epoch->active);
2165         spin_unlock(&tconn->epoch_lock);
2166
2167         rcu_read_lock();
2168         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2169         rcu_read_unlock();
2170         if (tp) {
2171                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2172                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2173                 if (err)
2174                         goto out_interrupted;
2175                 spin_lock_irq(&mdev->tconn->req_lock);
2176                 err = handle_write_conflicts(mdev, peer_req);
2177                 if (err) {
2178                         spin_unlock_irq(&mdev->tconn->req_lock);
2179                         if (err == -ENOENT) {
2180                                 put_ldev(mdev);
2181                                 return 0;
2182                         }
2183                         goto out_interrupted;
2184                 }
2185         } else
2186                 spin_lock_irq(&mdev->tconn->req_lock);
2187         list_add(&peer_req->w.list, &mdev->active_ee);
2188         spin_unlock_irq(&mdev->tconn->req_lock);
2189
2190         if (mdev->state.conn == C_SYNC_TARGET)
2191                 wait_event(mdev->ee_wait, !overlaping_resync_write(mdev, peer_req));
2192
2193         if (mdev->tconn->agreed_pro_version < 100) {
2194                 rcu_read_lock();
2195                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2196                 case DRBD_PROT_C:
2197                         dp_flags |= DP_SEND_WRITE_ACK;
2198                         break;
2199                 case DRBD_PROT_B:
2200                         dp_flags |= DP_SEND_RECEIVE_ACK;
2201                         break;
2202                 }
2203                 rcu_read_unlock();
2204         }
2205
2206         if (dp_flags & DP_SEND_WRITE_ACK) {
2207                 peer_req->flags |= EE_SEND_WRITE_ACK;
2208                 inc_unacked(mdev);
2209                 /* corresponding dec_unacked() in e_end_block()
2210                  * respective _drbd_clear_done_ee */
2211         }
2212
2213         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2214                 /* I really don't like it that the receiver thread
2215                  * sends on the msock, but anyways */
2216                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2217         }
2218
2219         if (mdev->state.pdsk < D_INCONSISTENT) {
2220                 /* In case we have the only disk of the cluster, */
2221                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2222                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2223                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2224                 drbd_al_begin_io(mdev, &peer_req->i);
2225         }
2226
2227         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2228         if (!err)
2229                 return 0;
2230
2231         /* don't care for the reason here */
2232         dev_err(DEV, "submit failed, triggering re-connect\n");
2233         spin_lock_irq(&mdev->tconn->req_lock);
2234         list_del(&peer_req->w.list);
2235         drbd_remove_epoch_entry_interval(mdev, peer_req);
2236         spin_unlock_irq(&mdev->tconn->req_lock);
2237         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2238                 drbd_al_complete_io(mdev, &peer_req->i);
2239
2240 out_interrupted:
2241         drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2242         put_ldev(mdev);
2243         drbd_free_peer_req(mdev, peer_req);
2244         return err;
2245 }
2246
2247 /* We may throttle resync, if the lower device seems to be busy,
2248  * and current sync rate is above c_min_rate.
2249  *
2250  * To decide whether or not the lower device is busy, we use a scheme similar
2251  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2252  * (more than 64 sectors) of activity we cannot account for with our own resync
2253  * activity, it obviously is "busy".
2254  *
2255  * The current sync rate used here uses only the most recent two step marks,
2256  * to have a short time average so we can react faster.
2257  */
2258 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2259 {
2260         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2261         unsigned long db, dt, dbdt;
2262         struct lc_element *tmp;
2263         int curr_events;
2264         int throttle = 0;
2265         unsigned int c_min_rate;
2266
2267         rcu_read_lock();
2268         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2269         rcu_read_unlock();
2270
2271         /* feature disabled? */
2272         if (c_min_rate == 0)
2273                 return 0;
2274
2275         spin_lock_irq(&mdev->al_lock);
2276         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2277         if (tmp) {
2278                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2279                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2280                         spin_unlock_irq(&mdev->al_lock);
2281                         return 0;
2282                 }
2283                 /* Do not slow down if app IO is already waiting for this extent */
2284         }
2285         spin_unlock_irq(&mdev->al_lock);
2286
2287         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2288                       (int)part_stat_read(&disk->part0, sectors[1]) -
2289                         atomic_read(&mdev->rs_sect_ev);
2290
2291         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2292                 unsigned long rs_left;
2293                 int i;
2294
2295                 mdev->rs_last_events = curr_events;
2296
2297                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2298                  * approx. */
2299                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2300
2301                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2302                         rs_left = mdev->ov_left;
2303                 else
2304                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2305
2306                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2307                 if (!dt)
2308                         dt++;
2309                 db = mdev->rs_mark_left[i] - rs_left;
2310                 dbdt = Bit2KB(db/dt);
2311
2312                 if (dbdt > c_min_rate)
2313                         throttle = 1;
2314         }
2315         return throttle;
2316 }
2317
2318
2319 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2320 {
2321         struct drbd_conf *mdev;
2322         sector_t sector;
2323         sector_t capacity;
2324         struct drbd_peer_request *peer_req;
2325         struct digest_info *di = NULL;
2326         int size, verb;
2327         unsigned int fault_type;
2328         struct p_block_req *p = pi->data;
2329
2330         mdev = vnr_to_mdev(tconn, pi->vnr);
2331         if (!mdev)
2332                 return -EIO;
2333         capacity = drbd_get_capacity(mdev->this_bdev);
2334
2335         sector = be64_to_cpu(p->sector);
2336         size   = be32_to_cpu(p->blksize);
2337
2338         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2339                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2340                                 (unsigned long long)sector, size);
2341                 return -EINVAL;
2342         }
2343         if (sector + (size>>9) > capacity) {
2344                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2345                                 (unsigned long long)sector, size);
2346                 return -EINVAL;
2347         }
2348
2349         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2350                 verb = 1;
2351                 switch (pi->cmd) {
2352                 case P_DATA_REQUEST:
2353                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2354                         break;
2355                 case P_RS_DATA_REQUEST:
2356                 case P_CSUM_RS_REQUEST:
2357                 case P_OV_REQUEST:
2358                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2359                         break;
2360                 case P_OV_REPLY:
2361                         verb = 0;
2362                         dec_rs_pending(mdev);
2363                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2364                         break;
2365                 default:
2366                         BUG();
2367                 }
2368                 if (verb && __ratelimit(&drbd_ratelimit_state))
2369                         dev_err(DEV, "Can not satisfy peer's read request, "
2370                             "no local data.\n");
2371
2372                 /* drain possibly payload */
2373                 return drbd_drain_block(mdev, pi->size);
2374         }
2375
2376         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2377          * "criss-cross" setup, that might cause write-out on some other DRBD,
2378          * which in turn might block on the other node at this very place.  */
2379         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2380         if (!peer_req) {
2381                 put_ldev(mdev);
2382                 return -ENOMEM;
2383         }
2384
2385         switch (pi->cmd) {
2386         case P_DATA_REQUEST:
2387                 peer_req->w.cb = w_e_end_data_req;
2388                 fault_type = DRBD_FAULT_DT_RD;
2389                 /* application IO, don't drbd_rs_begin_io */
2390                 goto submit;
2391
2392         case P_RS_DATA_REQUEST:
2393                 peer_req->w.cb = w_e_end_rsdata_req;
2394                 fault_type = DRBD_FAULT_RS_RD;
2395                 /* used in the sector offset progress display */
2396                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2397                 break;
2398
2399         case P_OV_REPLY:
2400         case P_CSUM_RS_REQUEST:
2401                 fault_type = DRBD_FAULT_RS_RD;
2402                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2403                 if (!di)
2404                         goto out_free_e;
2405
2406                 di->digest_size = pi->size;
2407                 di->digest = (((char *)di)+sizeof(struct digest_info));
2408
2409                 peer_req->digest = di;
2410                 peer_req->flags |= EE_HAS_DIGEST;
2411
2412                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2413                         goto out_free_e;
2414
2415                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2416                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2417                         peer_req->w.cb = w_e_end_csum_rs_req;
2418                         /* used in the sector offset progress display */
2419                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2420                 } else if (pi->cmd == P_OV_REPLY) {
2421                         /* track progress, we may need to throttle */
2422                         atomic_add(size >> 9, &mdev->rs_sect_in);
2423                         peer_req->w.cb = w_e_end_ov_reply;
2424                         dec_rs_pending(mdev);
2425                         /* drbd_rs_begin_io done when we sent this request,
2426                          * but accounting still needs to be done. */
2427                         goto submit_for_resync;
2428                 }
2429                 break;
2430
2431         case P_OV_REQUEST:
2432                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2433                     mdev->tconn->agreed_pro_version >= 90) {
2434                         unsigned long now = jiffies;
2435                         int i;
2436                         mdev->ov_start_sector = sector;
2437                         mdev->ov_position = sector;
2438                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2439                         mdev->rs_total = mdev->ov_left;
2440                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2441                                 mdev->rs_mark_left[i] = mdev->ov_left;
2442                                 mdev->rs_mark_time[i] = now;
2443                         }
2444                         dev_info(DEV, "Online Verify start sector: %llu\n",
2445                                         (unsigned long long)sector);
2446                 }
2447                 peer_req->w.cb = w_e_end_ov_req;
2448                 fault_type = DRBD_FAULT_RS_RD;
2449                 break;
2450
2451         default:
2452                 BUG();
2453         }
2454
2455         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2456          * wrt the receiver, but it is not as straightforward as it may seem.
2457          * Various places in the resync start and stop logic assume resync
2458          * requests are processed in order, requeuing this on the worker thread
2459          * introduces a bunch of new code for synchronization between threads.
2460          *
2461          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2462          * "forever", throttling after drbd_rs_begin_io will lock that extent
2463          * for application writes for the same time.  For now, just throttle
2464          * here, where the rest of the code expects the receiver to sleep for
2465          * a while, anyways.
2466          */
2467
2468         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2469          * this defers syncer requests for some time, before letting at least
2470          * on request through.  The resync controller on the receiving side
2471          * will adapt to the incoming rate accordingly.
2472          *
2473          * We cannot throttle here if remote is Primary/SyncTarget:
2474          * we would also throttle its application reads.
2475          * In that case, throttling is done on the SyncTarget only.
2476          */
2477         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2478                 schedule_timeout_uninterruptible(HZ/10);
2479         if (drbd_rs_begin_io(mdev, sector))
2480                 goto out_free_e;
2481
2482 submit_for_resync:
2483         atomic_add(size >> 9, &mdev->rs_sect_ev);
2484
2485 submit:
2486         inc_unacked(mdev);
2487         spin_lock_irq(&mdev->tconn->req_lock);
2488         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2489         spin_unlock_irq(&mdev->tconn->req_lock);
2490
2491         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2492                 return 0;
2493
2494         /* don't care for the reason here */
2495         dev_err(DEV, "submit failed, triggering re-connect\n");
2496         spin_lock_irq(&mdev->tconn->req_lock);
2497         list_del(&peer_req->w.list);
2498         spin_unlock_irq(&mdev->tconn->req_lock);
2499         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2500
2501 out_free_e:
2502         put_ldev(mdev);
2503         drbd_free_peer_req(mdev, peer_req);
2504         return -EIO;
2505 }
2506
2507 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2508 {
2509         int self, peer, rv = -100;
2510         unsigned long ch_self, ch_peer;
2511         enum drbd_after_sb_p after_sb_0p;
2512
2513         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2514         peer = mdev->p_uuid[UI_BITMAP] & 1;
2515
2516         ch_peer = mdev->p_uuid[UI_SIZE];
2517         ch_self = mdev->comm_bm_set;
2518
2519         rcu_read_lock();
2520         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2521         rcu_read_unlock();
2522         switch (after_sb_0p) {
2523         case ASB_CONSENSUS:
2524         case ASB_DISCARD_SECONDARY:
2525         case ASB_CALL_HELPER:
2526         case ASB_VIOLENTLY:
2527                 dev_err(DEV, "Configuration error.\n");
2528                 break;
2529         case ASB_DISCONNECT:
2530                 break;
2531         case ASB_DISCARD_YOUNGER_PRI:
2532                 if (self == 0 && peer == 1) {
2533                         rv = -1;
2534                         break;
2535                 }
2536                 if (self == 1 && peer == 0) {
2537                         rv =  1;
2538                         break;
2539                 }
2540                 /* Else fall through to one of the other strategies... */
2541         case ASB_DISCARD_OLDER_PRI:
2542                 if (self == 0 && peer == 1) {
2543                         rv = 1;
2544                         break;
2545                 }
2546                 if (self == 1 && peer == 0) {
2547                         rv = -1;
2548                         break;
2549                 }
2550                 /* Else fall through to one of the other strategies... */
2551                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2552                      "Using discard-least-changes instead\n");
2553         case ASB_DISCARD_ZERO_CHG:
2554                 if (ch_peer == 0 && ch_self == 0) {
2555                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2556                                 ? -1 : 1;
2557                         break;
2558                 } else {
2559                         if (ch_peer == 0) { rv =  1; break; }
2560                         if (ch_self == 0) { rv = -1; break; }
2561                 }
2562                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2563                         break;
2564         case ASB_DISCARD_LEAST_CHG:
2565                 if      (ch_self < ch_peer)
2566                         rv = -1;
2567                 else if (ch_self > ch_peer)
2568                         rv =  1;
2569                 else /* ( ch_self == ch_peer ) */
2570                      /* Well, then use something else. */
2571                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2572                                 ? -1 : 1;
2573                 break;
2574         case ASB_DISCARD_LOCAL:
2575                 rv = -1;
2576                 break;
2577         case ASB_DISCARD_REMOTE:
2578                 rv =  1;
2579         }
2580
2581         return rv;
2582 }
2583
2584 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2585 {
2586         int hg, rv = -100;
2587         enum drbd_after_sb_p after_sb_1p;
2588
2589         rcu_read_lock();
2590         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2591         rcu_read_unlock();
2592         switch (after_sb_1p) {
2593         case ASB_DISCARD_YOUNGER_PRI:
2594         case ASB_DISCARD_OLDER_PRI:
2595         case ASB_DISCARD_LEAST_CHG:
2596         case ASB_DISCARD_LOCAL:
2597         case ASB_DISCARD_REMOTE:
2598         case ASB_DISCARD_ZERO_CHG:
2599                 dev_err(DEV, "Configuration error.\n");
2600                 break;
2601         case ASB_DISCONNECT:
2602                 break;
2603         case ASB_CONSENSUS:
2604                 hg = drbd_asb_recover_0p(mdev);
2605                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2606                         rv = hg;
2607                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2608                         rv = hg;
2609                 break;
2610         case ASB_VIOLENTLY:
2611                 rv = drbd_asb_recover_0p(mdev);
2612                 break;
2613         case ASB_DISCARD_SECONDARY:
2614                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2615         case ASB_CALL_HELPER:
2616                 hg = drbd_asb_recover_0p(mdev);
2617                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2618                         enum drbd_state_rv rv2;
2619
2620                         drbd_set_role(mdev, R_SECONDARY, 0);
2621                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2622                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2623                           * we do not need to wait for the after state change work either. */
2624                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2625                         if (rv2 != SS_SUCCESS) {
2626                                 drbd_khelper(mdev, "pri-lost-after-sb");
2627                         } else {
2628                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2629                                 rv = hg;
2630                         }
2631                 } else
2632                         rv = hg;
2633         }
2634
2635         return rv;
2636 }
2637
2638 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2639 {
2640         int hg, rv = -100;
2641         enum drbd_after_sb_p after_sb_2p;
2642
2643         rcu_read_lock();
2644         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2645         rcu_read_unlock();
2646         switch (after_sb_2p) {
2647         case ASB_DISCARD_YOUNGER_PRI:
2648         case ASB_DISCARD_OLDER_PRI:
2649         case ASB_DISCARD_LEAST_CHG:
2650         case ASB_DISCARD_LOCAL:
2651         case ASB_DISCARD_REMOTE:
2652         case ASB_CONSENSUS:
2653         case ASB_DISCARD_SECONDARY:
2654         case ASB_DISCARD_ZERO_CHG:
2655                 dev_err(DEV, "Configuration error.\n");
2656                 break;
2657         case ASB_VIOLENTLY:
2658                 rv = drbd_asb_recover_0p(mdev);
2659                 break;
2660         case ASB_DISCONNECT:
2661                 break;
2662         case ASB_CALL_HELPER:
2663                 hg = drbd_asb_recover_0p(mdev);
2664                 if (hg == -1) {
2665                         enum drbd_state_rv rv2;
2666
2667                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2668                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2669                           * we do not need to wait for the after state change work either. */
2670                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2671                         if (rv2 != SS_SUCCESS) {
2672                                 drbd_khelper(mdev, "pri-lost-after-sb");
2673                         } else {
2674                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2675                                 rv = hg;
2676                         }
2677                 } else
2678                         rv = hg;
2679         }
2680
2681         return rv;
2682 }
2683
2684 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2685                            u64 bits, u64 flags)
2686 {
2687         if (!uuid) {
2688                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2689                 return;
2690         }
2691         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2692              text,
2693              (unsigned long long)uuid[UI_CURRENT],
2694              (unsigned long long)uuid[UI_BITMAP],
2695              (unsigned long long)uuid[UI_HISTORY_START],
2696              (unsigned long long)uuid[UI_HISTORY_END],
2697              (unsigned long long)bits,
2698              (unsigned long long)flags);
2699 }
2700
2701 /*
2702   100   after split brain try auto recover
2703     2   C_SYNC_SOURCE set BitMap
2704     1   C_SYNC_SOURCE use BitMap
2705     0   no Sync
2706    -1   C_SYNC_TARGET use BitMap
2707    -2   C_SYNC_TARGET set BitMap
2708  -100   after split brain, disconnect
2709 -1000   unrelated data
2710 -1091   requires proto 91
2711 -1096   requires proto 96
2712  */
2713 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2714 {
2715         u64 self, peer;
2716         int i, j;
2717
2718         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2719         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2720
2721         *rule_nr = 10;
2722         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2723                 return 0;
2724
2725         *rule_nr = 20;
2726         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2727              peer != UUID_JUST_CREATED)
2728                 return -2;
2729
2730         *rule_nr = 30;
2731         if (self != UUID_JUST_CREATED &&
2732             (peer == UUID_JUST_CREATED || peer == (u64)0))
2733                 return 2;
2734
2735         if (self == peer) {
2736                 int rct, dc; /* roles at crash time */
2737
2738                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2739
2740                         if (mdev->tconn->agreed_pro_version < 91)
2741                                 return -1091;
2742
2743                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2744                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2745                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2746                                 drbd_uuid_set_bm(mdev, 0UL);
2747
2748                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2749                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2750                                 *rule_nr = 34;
2751                         } else {
2752                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2753                                 *rule_nr = 36;
2754                         }
2755
2756                         return 1;
2757                 }
2758
2759                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2760
2761                         if (mdev->tconn->agreed_pro_version < 91)
2762                                 return -1091;
2763
2764                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2765                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2766                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2767
2768                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2769                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2770                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2771
2772                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2773                                 *rule_nr = 35;
2774                         } else {
2775                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2776                                 *rule_nr = 37;
2777                         }
2778
2779                         return -1;
2780                 }
2781
2782                 /* Common power [off|failure] */
2783                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2784                         (mdev->p_uuid[UI_FLAGS] & 2);
2785                 /* lowest bit is set when we were primary,
2786                  * next bit (weight 2) is set when peer was primary */
2787                 *rule_nr = 40;
2788
2789                 switch (rct) {
2790                 case 0: /* !self_pri && !peer_pri */ return 0;
2791                 case 1: /*  self_pri && !peer_pri */ return 1;
2792                 case 2: /* !self_pri &&  peer_pri */ return -1;
2793                 case 3: /*  self_pri &&  peer_pri */
2794                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2795                         return dc ? -1 : 1;
2796                 }
2797         }
2798
2799         *rule_nr = 50;
2800         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2801         if (self == peer)
2802                 return -1;
2803
2804         *rule_nr = 51;
2805         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2806         if (self == peer) {
2807                 if (mdev->tconn->agreed_pro_version < 96 ?
2808                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2809                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2810                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2811                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2812                            resync as sync source modifications of the peer's UUIDs. */
2813
2814                         if (mdev->tconn->agreed_pro_version < 91)
2815                                 return -1091;
2816
2817                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2818                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2819
2820                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2821                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2822
2823                         return -1;
2824                 }
2825         }
2826
2827         *rule_nr = 60;
2828         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2829         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2830                 peer = mdev->p_uuid[i] & ~((u64)1);
2831                 if (self == peer)
2832                         return -2;
2833         }
2834
2835         *rule_nr = 70;
2836         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2837         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2838         if (self == peer)
2839                 return 1;
2840
2841         *rule_nr = 71;
2842         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2843         if (self == peer) {
2844                 if (mdev->tconn->agreed_pro_version < 96 ?
2845                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2846                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2847                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2848                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2849                            resync as sync source modifications of our UUIDs. */
2850
2851                         if (mdev->tconn->agreed_pro_version < 91)
2852                                 return -1091;
2853
2854                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2855                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2856
2857                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2858                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2859                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2860
2861                         return 1;
2862                 }
2863         }
2864
2865
2866         *rule_nr = 80;
2867         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2868         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2869                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2870                 if (self == peer)
2871                         return 2;
2872         }
2873
2874         *rule_nr = 90;
2875         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2876         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2877         if (self == peer && self != ((u64)0))
2878                 return 100;
2879
2880         *rule_nr = 100;
2881         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2882                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2883                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2884                         peer = mdev->p_uuid[j] & ~((u64)1);
2885                         if (self == peer)
2886                                 return -100;
2887                 }
2888         }
2889
2890         return -1000;
2891 }
2892
2893 /* drbd_sync_handshake() returns the new conn state on success, or
2894    CONN_MASK (-1) on failure.
2895  */
2896 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2897                                            enum drbd_disk_state peer_disk) __must_hold(local)
2898 {
2899         enum drbd_conns rv = C_MASK;
2900         enum drbd_disk_state mydisk;
2901         struct net_conf *nc;
2902         int hg, rule_nr, rr_conflict, tentative;
2903
2904         mydisk = mdev->state.disk;
2905         if (mydisk == D_NEGOTIATING)
2906                 mydisk = mdev->new_state_tmp.disk;
2907
2908         dev_info(DEV, "drbd_sync_handshake:\n");
2909         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2910         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2911                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2912
2913         hg = drbd_uuid_compare(mdev, &rule_nr);
2914
2915         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2916
2917         if (hg == -1000) {
2918                 dev_alert(DEV, "Unrelated data, aborting!\n");
2919                 return C_MASK;
2920         }
2921         if (hg < -1000) {
2922                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2923                 return C_MASK;
2924         }
2925
2926         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2927             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2928                 int f = (hg == -100) || abs(hg) == 2;
2929                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2930                 if (f)
2931                         hg = hg*2;
2932                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2933                      hg > 0 ? "source" : "target");
2934         }
2935
2936         if (abs(hg) == 100)
2937                 drbd_khelper(mdev, "initial-split-brain");
2938
2939         rcu_read_lock();
2940         nc = rcu_dereference(mdev->tconn->net_conf);
2941
2942         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2943                 int pcount = (mdev->state.role == R_PRIMARY)
2944                            + (peer_role == R_PRIMARY);
2945                 int forced = (hg == -100);
2946
2947                 switch (pcount) {
2948                 case 0:
2949                         hg = drbd_asb_recover_0p(mdev);
2950                         break;
2951                 case 1:
2952                         hg = drbd_asb_recover_1p(mdev);
2953                         break;
2954                 case 2:
2955                         hg = drbd_asb_recover_2p(mdev);
2956                         break;
2957                 }
2958                 if (abs(hg) < 100) {
2959                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2960                              "automatically solved. Sync from %s node\n",
2961                              pcount, (hg < 0) ? "peer" : "this");
2962                         if (forced) {
2963                                 dev_warn(DEV, "Doing a full sync, since"
2964                                      " UUIDs where ambiguous.\n");
2965                                 hg = hg*2;
2966                         }
2967                 }
2968         }
2969
2970         if (hg == -100) {
2971                 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
2972                         hg = -1;
2973                 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
2974                         hg = 1;
2975
2976                 if (abs(hg) < 100)
2977                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2978                              "Sync from %s node\n",
2979                              (hg < 0) ? "peer" : "this");
2980         }
2981         rr_conflict = nc->rr_conflict;
2982         tentative = nc->tentative;
2983         rcu_read_unlock();
2984
2985         if (hg == -100) {
2986                 /* FIXME this log message is not correct if we end up here
2987                  * after an attempted attach on a diskless node.
2988                  * We just refuse to attach -- well, we drop the "connection"
2989                  * to that disk, in a way... */
2990                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2991                 drbd_khelper(mdev, "split-brain");
2992                 return C_MASK;
2993         }
2994
2995         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2996                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2997                 return C_MASK;
2998         }
2999
3000         if (hg < 0 && /* by intention we do not use mydisk here. */
3001             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
3002                 switch (rr_conflict) {
3003                 case ASB_CALL_HELPER:
3004                         drbd_khelper(mdev, "pri-lost");
3005                         /* fall through */
3006                 case ASB_DISCONNECT:
3007                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
3008                         return C_MASK;
3009                 case ASB_VIOLENTLY:
3010                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
3011                              "assumption\n");
3012                 }
3013         }
3014
3015         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
3016                 if (hg == 0)
3017                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
3018                 else
3019                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
3020                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3021                                  abs(hg) >= 2 ? "full" : "bit-map based");
3022                 return C_MASK;
3023         }
3024
3025         if (abs(hg) >= 2) {
3026                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3027                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3028                                         BM_LOCKED_SET_ALLOWED))
3029                         return C_MASK;
3030         }
3031
3032         if (hg > 0) { /* become sync source. */
3033                 rv = C_WF_BITMAP_S;
3034         } else if (hg < 0) { /* become sync target */
3035                 rv = C_WF_BITMAP_T;
3036         } else {
3037                 rv = C_CONNECTED;
3038                 if (drbd_bm_total_weight(mdev)) {
3039                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3040                              drbd_bm_total_weight(mdev));
3041                 }
3042         }
3043
3044         return rv;
3045 }
3046
3047 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3048 {
3049         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3050         if (peer == ASB_DISCARD_REMOTE)
3051                 return ASB_DISCARD_LOCAL;
3052
3053         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3054         if (peer == ASB_DISCARD_LOCAL)
3055                 return ASB_DISCARD_REMOTE;
3056
3057         /* everything else is valid if they are equal on both sides. */
3058         return peer;
3059 }
3060
3061 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3062 {
3063         struct p_protocol *p = pi->data;
3064         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3065         int p_proto, p_discard_my_data, p_two_primaries, cf;
3066         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3067         char integrity_alg[SHARED_SECRET_MAX] = "";
3068         struct crypto_hash *peer_integrity_tfm = NULL;
3069         void *int_dig_in = NULL, *int_dig_vv = NULL;
3070
3071         p_proto         = be32_to_cpu(p->protocol);
3072         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3073         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3074         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3075         p_two_primaries = be32_to_cpu(p->two_primaries);
3076         cf              = be32_to_cpu(p->conn_flags);
3077         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3078
3079         if (tconn->agreed_pro_version >= 87) {
3080                 int err;
3081
3082                 if (pi->size > sizeof(integrity_alg))
3083                         return -EIO;
3084                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3085                 if (err)
3086                         return err;
3087                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3088         }
3089
3090         if (pi->cmd != P_PROTOCOL_UPDATE) {
3091                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3092
3093                 if (cf & CF_DRY_RUN)
3094                         set_bit(CONN_DRY_RUN, &tconn->flags);
3095
3096                 rcu_read_lock();
3097                 nc = rcu_dereference(tconn->net_conf);
3098
3099                 if (p_proto != nc->wire_protocol) {
3100                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3101                         goto disconnect_rcu_unlock;
3102                 }
3103
3104                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3105                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3106                         goto disconnect_rcu_unlock;
3107                 }
3108
3109                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3110                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3111                         goto disconnect_rcu_unlock;
3112                 }
3113
3114                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3115                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3116                         goto disconnect_rcu_unlock;
3117                 }
3118
3119                 if (p_discard_my_data && nc->discard_my_data) {
3120                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3121                         goto disconnect_rcu_unlock;
3122                 }
3123
3124                 if (p_two_primaries != nc->two_primaries) {
3125                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3126                         goto disconnect_rcu_unlock;
3127                 }
3128
3129                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3130                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3131                         goto disconnect_rcu_unlock;
3132                 }
3133
3134                 rcu_read_unlock();
3135         }
3136
3137         if (integrity_alg[0]) {
3138                 int hash_size;
3139
3140                 /*
3141                  * We can only change the peer data integrity algorithm
3142                  * here.  Changing our own data integrity algorithm
3143                  * requires that we send a P_PROTOCOL_UPDATE packet at
3144                  * the same time; otherwise, the peer has no way to
3145                  * tell between which packets the algorithm should
3146                  * change.
3147                  */
3148
3149                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3150                 if (!peer_integrity_tfm) {
3151                         conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3152                                  integrity_alg);
3153                         goto disconnect;
3154                 }
3155
3156                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3157                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3158                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3159                 if (!(int_dig_in && int_dig_vv)) {
3160                         conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3161                         goto disconnect;
3162                 }
3163         }
3164
3165         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3166         if (!new_net_conf) {
3167                 conn_err(tconn, "Allocation of new net_conf failed\n");
3168                 goto disconnect;
3169         }
3170
3171         mutex_lock(&tconn->data.mutex);
3172         mutex_lock(&tconn->conf_update);
3173         old_net_conf = tconn->net_conf;
3174         *new_net_conf = *old_net_conf;
3175
3176         new_net_conf->wire_protocol = p_proto;
3177         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3178         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3179         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3180         new_net_conf->two_primaries = p_two_primaries;
3181
3182         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3183         mutex_unlock(&tconn->conf_update);
3184         mutex_unlock(&tconn->data.mutex);
3185
3186         crypto_free_hash(tconn->peer_integrity_tfm);
3187         kfree(tconn->int_dig_in);
3188         kfree(tconn->int_dig_vv);
3189         tconn->peer_integrity_tfm = peer_integrity_tfm;
3190         tconn->int_dig_in = int_dig_in;
3191         tconn->int_dig_vv = int_dig_vv;
3192
3193         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3194                 conn_info(tconn, "peer data-integrity-alg: %s\n",
3195                           integrity_alg[0] ? integrity_alg : "(none)");
3196
3197         synchronize_rcu();
3198         kfree(old_net_conf);
3199         return 0;
3200
3201 disconnect_rcu_unlock:
3202         rcu_read_unlock();
3203 disconnect:
3204         crypto_free_hash(peer_integrity_tfm);
3205         kfree(int_dig_in);
3206         kfree(int_dig_vv);
3207         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3208         return -EIO;
3209 }
3210
3211 /* helper function
3212  * input: alg name, feature name
3213  * return: NULL (alg name was "")
3214  *         ERR_PTR(error) if something goes wrong
3215  *         or the crypto hash ptr, if it worked out ok. */
3216 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3217                 const char *alg, const char *name)
3218 {
3219         struct crypto_hash *tfm;
3220
3221         if (!alg[0])
3222                 return NULL;
3223
3224         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3225         if (IS_ERR(tfm)) {
3226                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3227                         alg, name, PTR_ERR(tfm));
3228                 return tfm;
3229         }
3230         return tfm;
3231 }
3232
3233 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3234 {
3235         void *buffer = tconn->data.rbuf;
3236         int size = pi->size;
3237
3238         while (size) {
3239                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3240                 s = drbd_recv(tconn, buffer, s);
3241                 if (s <= 0) {
3242                         if (s < 0)
3243                                 return s;
3244                         break;
3245                 }
3246                 size -= s;
3247         }
3248         if (size)
3249                 return -EIO;
3250         return 0;
3251 }
3252
3253 /*
3254  * config_unknown_volume  -  device configuration command for unknown volume
3255  *
3256  * When a device is added to an existing connection, the node on which the
3257  * device is added first will send configuration commands to its peer but the
3258  * peer will not know about the device yet.  It will warn and ignore these
3259  * commands.  Once the device is added on the second node, the second node will
3260  * send the same device configuration commands, but in the other direction.
3261  *
3262  * (We can also end up here if drbd is misconfigured.)
3263  */
3264 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3265 {
3266         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3267                   cmdname(pi->cmd), pi->vnr);
3268         return ignore_remaining_packet(tconn, pi);
3269 }
3270
3271 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3272 {
3273         struct drbd_conf *mdev;
3274         struct p_rs_param_95 *p;
3275         unsigned int header_size, data_size, exp_max_sz;
3276         struct crypto_hash *verify_tfm = NULL;
3277         struct crypto_hash *csums_tfm = NULL;
3278         struct net_conf *old_net_conf, *new_net_conf = NULL;
3279         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3280         const int apv = tconn->agreed_pro_version;
3281         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3282         int fifo_size = 0;
3283         int err;
3284
3285         mdev = vnr_to_mdev(tconn, pi->vnr);
3286         if (!mdev)
3287                 return config_unknown_volume(tconn, pi);
3288
3289         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3290                     : apv == 88 ? sizeof(struct p_rs_param)
3291                                         + SHARED_SECRET_MAX
3292                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3293                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3294
3295         if (pi->size > exp_max_sz) {
3296                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3297                     pi->size, exp_max_sz);
3298                 return -EIO;
3299         }
3300
3301         if (apv <= 88) {
3302                 header_size = sizeof(struct p_rs_param);
3303                 data_size = pi->size - header_size;
3304         } else if (apv <= 94) {
3305                 header_size = sizeof(struct p_rs_param_89);
3306                 data_size = pi->size - header_size;
3307                 D_ASSERT(data_size == 0);
3308         } else {
3309                 header_size = sizeof(struct p_rs_param_95);
3310                 data_size = pi->size - header_size;
3311                 D_ASSERT(data_size == 0);
3312         }
3313
3314         /* initialize verify_alg and csums_alg */
3315         p = pi->data;
3316         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3317
3318         err = drbd_recv_all(mdev->tconn, p, header_size);
3319         if (err)
3320                 return err;
3321
3322         mutex_lock(&mdev->tconn->conf_update);
3323         old_net_conf = mdev->tconn->net_conf;
3324         if (get_ldev(mdev)) {
3325                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3326                 if (!new_disk_conf) {
3327                         put_ldev(mdev);
3328                         mutex_unlock(&mdev->tconn->conf_update);
3329                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3330                         return -ENOMEM;
3331                 }
3332
3333                 old_disk_conf = mdev->ldev->disk_conf;
3334                 *new_disk_conf = *old_disk_conf;
3335
3336                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3337         }
3338
3339         if (apv >= 88) {
3340                 if (apv == 88) {
3341                         if (data_size > SHARED_SECRET_MAX) {
3342                                 dev_err(DEV, "verify-alg too long, "
3343                                     "peer wants %u, accepting only %u byte\n",
3344                                                 data_size, SHARED_SECRET_MAX);
3345                                 err = -EIO;
3346                                 goto reconnect;
3347                         }
3348
3349                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3350                         if (err)
3351                                 goto reconnect;
3352                         /* we expect NUL terminated string */
3353                         /* but just in case someone tries to be evil */
3354                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3355                         p->verify_alg[data_size-1] = 0;
3356
3357                 } else /* apv >= 89 */ {
3358                         /* we still expect NUL terminated strings */
3359                         /* but just in case someone tries to be evil */
3360                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3361                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3362                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3363                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3364                 }
3365
3366                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3367                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3368                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3369                                     old_net_conf->verify_alg, p->verify_alg);
3370                                 goto disconnect;
3371                         }
3372                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3373                                         p->verify_alg, "verify-alg");
3374                         if (IS_ERR(verify_tfm)) {
3375                                 verify_tfm = NULL;
3376                                 goto disconnect;
3377                         }
3378                 }
3379
3380                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3381                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3382                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3383                                     old_net_conf->csums_alg, p->csums_alg);
3384                                 goto disconnect;
3385                         }
3386                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3387                                         p->csums_alg, "csums-alg");
3388                         if (IS_ERR(csums_tfm)) {
3389                                 csums_tfm = NULL;
3390                                 goto disconnect;
3391                         }
3392                 }
3393
3394                 if (apv > 94 && new_disk_conf) {
3395                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3396                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3397                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3398                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3399
3400                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3401                         if (fifo_size != mdev->rs_plan_s->size) {
3402                                 new_plan = fifo_alloc(fifo_size);
3403                                 if (!new_plan) {
3404                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3405                                         put_ldev(mdev);
3406                                         goto disconnect;
3407                                 }
3408                         }
3409                 }
3410
3411                 if (verify_tfm || csums_tfm) {
3412                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3413                         if (!new_net_conf) {
3414                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3415                                 goto disconnect;
3416                         }
3417
3418                         *new_net_conf = *old_net_conf;
3419
3420                         if (verify_tfm) {
3421                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3422                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3423                                 crypto_free_hash(mdev->tconn->verify_tfm);
3424                                 mdev->tconn->verify_tfm = verify_tfm;
3425                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3426                         }
3427                         if (csums_tfm) {
3428                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3429                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3430                                 crypto_free_hash(mdev->tconn->csums_tfm);
3431                                 mdev->tconn->csums_tfm = csums_tfm;
3432                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3433                         }
3434                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3435                 }
3436         }
3437
3438         if (new_disk_conf) {
3439                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3440                 put_ldev(mdev);
3441         }
3442
3443         if (new_plan) {
3444                 old_plan = mdev->rs_plan_s;
3445                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3446         }
3447
3448         mutex_unlock(&mdev->tconn->conf_update);
3449         synchronize_rcu();
3450         if (new_net_conf)
3451                 kfree(old_net_conf);
3452         kfree(old_disk_conf);
3453         kfree(old_plan);
3454
3455         return 0;
3456
3457 reconnect:
3458         if (new_disk_conf) {
3459                 put_ldev(mdev);
3460                 kfree(new_disk_conf);
3461         }
3462         mutex_unlock(&mdev->tconn->conf_update);
3463         return -EIO;
3464
3465 disconnect:
3466         kfree(new_plan);
3467         if (new_disk_conf) {
3468                 put_ldev(mdev);
3469                 kfree(new_disk_conf);
3470         }
3471         mutex_unlock(&mdev->tconn->conf_update);
3472         /* just for completeness: actually not needed,
3473          * as this is not reached if csums_tfm was ok. */
3474         crypto_free_hash(csums_tfm);
3475         /* but free the verify_tfm again, if csums_tfm did not work out */
3476         crypto_free_hash(verify_tfm);
3477         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3478         return -EIO;
3479 }
3480
3481 /* warn if the arguments differ by more than 12.5% */
3482 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3483         const char *s, sector_t a, sector_t b)
3484 {
3485         sector_t d;
3486         if (a == 0 || b == 0)
3487                 return;
3488         d = (a > b) ? (a - b) : (b - a);
3489         if (d > (a>>3) || d > (b>>3))
3490                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3491                      (unsigned long long)a, (unsigned long long)b);
3492 }
3493
3494 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3495 {
3496         struct drbd_conf *mdev;
3497         struct p_sizes *p = pi->data;
3498         enum determine_dev_size dd = unchanged;
3499         sector_t p_size, p_usize, my_usize;
3500         int ldsc = 0; /* local disk size changed */
3501         enum dds_flags ddsf;
3502
3503         mdev = vnr_to_mdev(tconn, pi->vnr);
3504         if (!mdev)
3505                 return config_unknown_volume(tconn, pi);
3506
3507         p_size = be64_to_cpu(p->d_size);
3508         p_usize = be64_to_cpu(p->u_size);
3509
3510         /* just store the peer's disk size for now.
3511          * we still need to figure out whether we accept that. */
3512         mdev->p_size = p_size;
3513
3514         if (get_ldev(mdev)) {
3515                 rcu_read_lock();
3516                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3517                 rcu_read_unlock();
3518
3519                 warn_if_differ_considerably(mdev, "lower level device sizes",
3520                            p_size, drbd_get_max_capacity(mdev->ldev));
3521                 warn_if_differ_considerably(mdev, "user requested size",
3522                                             p_usize, my_usize);
3523
3524                 /* if this is the first connect, or an otherwise expected
3525                  * param exchange, choose the minimum */
3526                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3527                         p_usize = min_not_zero(my_usize, p_usize);
3528
3529                 /* Never shrink a device with usable data during connect.
3530                    But allow online shrinking if we are connected. */
3531                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3532                     drbd_get_capacity(mdev->this_bdev) &&
3533                     mdev->state.disk >= D_OUTDATED &&
3534                     mdev->state.conn < C_CONNECTED) {
3535                         dev_err(DEV, "The peer's disk size is too small!\n");
3536                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3537                         put_ldev(mdev);
3538                         return -EIO;
3539                 }
3540
3541                 if (my_usize != p_usize) {
3542                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3543
3544                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3545                         if (!new_disk_conf) {
3546                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3547                                 put_ldev(mdev);
3548                                 return -ENOMEM;
3549                         }
3550
3551                         mutex_lock(&mdev->tconn->conf_update);
3552                         old_disk_conf = mdev->ldev->disk_conf;
3553                         *new_disk_conf = *old_disk_conf;
3554                         new_disk_conf->disk_size = p_usize;
3555
3556                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3557                         mutex_unlock(&mdev->tconn->conf_update);
3558                         synchronize_rcu();
3559                         kfree(old_disk_conf);
3560
3561                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3562                                  (unsigned long)my_usize);
3563                 }
3564
3565                 put_ldev(mdev);
3566         }
3567
3568         ddsf = be16_to_cpu(p->dds_flags);
3569         if (get_ldev(mdev)) {
3570                 dd = drbd_determine_dev_size(mdev, ddsf);
3571                 put_ldev(mdev);
3572                 if (dd == dev_size_error)
3573                         return -EIO;
3574                 drbd_md_sync(mdev);
3575         } else {
3576                 /* I am diskless, need to accept the peer's size. */
3577                 drbd_set_my_capacity(mdev, p_size);
3578         }
3579
3580         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3581         drbd_reconsider_max_bio_size(mdev);
3582
3583         if (get_ldev(mdev)) {
3584                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3585                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3586                         ldsc = 1;
3587                 }
3588
3589                 put_ldev(mdev);
3590         }
3591
3592         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3593                 if (be64_to_cpu(p->c_size) !=
3594                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3595                         /* we have different sizes, probably peer
3596                          * needs to know my new size... */
3597                         drbd_send_sizes(mdev, 0, ddsf);
3598                 }
3599                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3600                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3601                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3602                             mdev->state.disk >= D_INCONSISTENT) {
3603                                 if (ddsf & DDSF_NO_RESYNC)
3604                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3605                                 else
3606                                         resync_after_online_grow(mdev);
3607                         } else
3608                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3609                 }
3610         }
3611
3612         return 0;
3613 }
3614
3615 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3616 {
3617         struct drbd_conf *mdev;
3618         struct p_uuids *p = pi->data;
3619         u64 *p_uuid;
3620         int i, updated_uuids = 0;
3621
3622         mdev = vnr_to_mdev(tconn, pi->vnr);
3623         if (!mdev)
3624                 return config_unknown_volume(tconn, pi);
3625
3626         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3627
3628         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3629                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3630
3631         kfree(mdev->p_uuid);
3632         mdev->p_uuid = p_uuid;
3633
3634         if (mdev->state.conn < C_CONNECTED &&
3635             mdev->state.disk < D_INCONSISTENT &&
3636             mdev->state.role == R_PRIMARY &&
3637             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3638                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3639                     (unsigned long long)mdev->ed_uuid);
3640                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3641                 return -EIO;
3642         }
3643
3644         if (get_ldev(mdev)) {
3645                 int skip_initial_sync =
3646                         mdev->state.conn == C_CONNECTED &&
3647                         mdev->tconn->agreed_pro_version >= 90 &&
3648                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3649                         (p_uuid[UI_FLAGS] & 8);
3650                 if (skip_initial_sync) {
3651                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3652                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3653                                         "clear_n_write from receive_uuids",
3654                                         BM_LOCKED_TEST_ALLOWED);
3655                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3656                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3657                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3658                                         CS_VERBOSE, NULL);
3659                         drbd_md_sync(mdev);
3660                         updated_uuids = 1;
3661                 }
3662                 put_ldev(mdev);
3663         } else if (mdev->state.disk < D_INCONSISTENT &&
3664                    mdev->state.role == R_PRIMARY) {
3665                 /* I am a diskless primary, the peer just created a new current UUID
3666                    for me. */
3667                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3668         }
3669
3670         /* Before we test for the disk state, we should wait until an eventually
3671            ongoing cluster wide state change is finished. That is important if
3672            we are primary and are detaching from our disk. We need to see the
3673            new disk state... */
3674         mutex_lock(mdev->state_mutex);
3675         mutex_unlock(mdev->state_mutex);
3676         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3677                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3678
3679         if (updated_uuids)
3680                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3681
3682         return 0;
3683 }
3684
3685 /**
3686  * convert_state() - Converts the peer's view of the cluster state to our point of view
3687  * @ps:         The state as seen by the peer.
3688  */
3689 static union drbd_state convert_state(union drbd_state ps)
3690 {
3691         union drbd_state ms;
3692
3693         static enum drbd_conns c_tab[] = {
3694                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3695                 [C_CONNECTED] = C_CONNECTED,
3696
3697                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3698                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3699                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3700                 [C_VERIFY_S]       = C_VERIFY_T,
3701                 [C_MASK]   = C_MASK,
3702         };
3703
3704         ms.i = ps.i;
3705
3706         ms.conn = c_tab[ps.conn];
3707         ms.peer = ps.role;
3708         ms.role = ps.peer;
3709         ms.pdsk = ps.disk;
3710         ms.disk = ps.pdsk;
3711         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3712
3713         return ms;
3714 }
3715
3716 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3717 {
3718         struct drbd_conf *mdev;
3719         struct p_req_state *p = pi->data;
3720         union drbd_state mask, val;
3721         enum drbd_state_rv rv;
3722
3723         mdev = vnr_to_mdev(tconn, pi->vnr);
3724         if (!mdev)
3725                 return -EIO;
3726
3727         mask.i = be32_to_cpu(p->mask);
3728         val.i = be32_to_cpu(p->val);
3729
3730         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3731             mutex_is_locked(mdev->state_mutex)) {
3732                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3733                 return 0;
3734         }
3735
3736         mask = convert_state(mask);
3737         val = convert_state(val);
3738
3739         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3740         drbd_send_sr_reply(mdev, rv);
3741
3742         drbd_md_sync(mdev);
3743
3744         return 0;
3745 }
3746
3747 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3748 {
3749         struct p_req_state *p = pi->data;
3750         union drbd_state mask, val;
3751         enum drbd_state_rv rv;
3752
3753         mask.i = be32_to_cpu(p->mask);
3754         val.i = be32_to_cpu(p->val);
3755
3756         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3757             mutex_is_locked(&tconn->cstate_mutex)) {
3758                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3759                 return 0;
3760         }
3761
3762         mask = convert_state(mask);
3763         val = convert_state(val);
3764
3765         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3766         conn_send_sr_reply(tconn, rv);
3767
3768         return 0;
3769 }
3770
3771 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3772 {
3773         struct drbd_conf *mdev;
3774         struct p_state *p = pi->data;
3775         union drbd_state os, ns, peer_state;
3776         enum drbd_disk_state real_peer_disk;
3777         enum chg_state_flags cs_flags;
3778         int rv;
3779
3780         mdev = vnr_to_mdev(tconn, pi->vnr);
3781         if (!mdev)
3782                 return config_unknown_volume(tconn, pi);
3783
3784         peer_state.i = be32_to_cpu(p->state);
3785
3786         real_peer_disk = peer_state.disk;
3787         if (peer_state.disk == D_NEGOTIATING) {
3788                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3789                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3790         }
3791
3792         spin_lock_irq(&mdev->tconn->req_lock);
3793  retry:
3794         os = ns = drbd_read_state(mdev);
3795         spin_unlock_irq(&mdev->tconn->req_lock);
3796
3797         /* If some other part of the code (asender thread, timeout)
3798          * already decided to close the connection again,
3799          * we must not "re-establish" it here. */
3800         if (os.conn <= C_TEAR_DOWN)
3801                 return false;
3802
3803         /* If this is the "end of sync" confirmation, usually the peer disk
3804          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3805          * set) resync started in PausedSyncT, or if the timing of pause-/
3806          * unpause-sync events has been "just right", the peer disk may
3807          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3808          */
3809         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3810             real_peer_disk == D_UP_TO_DATE &&
3811             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3812                 /* If we are (becoming) SyncSource, but peer is still in sync
3813                  * preparation, ignore its uptodate-ness to avoid flapping, it
3814                  * will change to inconsistent once the peer reaches active
3815                  * syncing states.
3816                  * It may have changed syncer-paused flags, however, so we
3817                  * cannot ignore this completely. */
3818                 if (peer_state.conn > C_CONNECTED &&
3819                     peer_state.conn < C_SYNC_SOURCE)
3820                         real_peer_disk = D_INCONSISTENT;
3821
3822                 /* if peer_state changes to connected at the same time,
3823                  * it explicitly notifies us that it finished resync.
3824                  * Maybe we should finish it up, too? */
3825                 else if (os.conn >= C_SYNC_SOURCE &&
3826                          peer_state.conn == C_CONNECTED) {
3827                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3828                                 drbd_resync_finished(mdev);
3829                         return 0;
3830                 }
3831         }
3832
3833         /* peer says his disk is inconsistent, while we think it is uptodate,
3834          * and this happens while the peer still thinks we have a sync going on,
3835          * but we think we are already done with the sync.
3836          * We ignore this to avoid flapping pdsk.
3837          * This should not happen, if the peer is a recent version of drbd. */
3838         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3839             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3840                 real_peer_disk = D_UP_TO_DATE;
3841
3842         if (ns.conn == C_WF_REPORT_PARAMS)
3843                 ns.conn = C_CONNECTED;
3844
3845         if (peer_state.conn == C_AHEAD)
3846                 ns.conn = C_BEHIND;
3847
3848         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3849             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3850                 int cr; /* consider resync */
3851
3852                 /* if we established a new connection */
3853                 cr  = (os.conn < C_CONNECTED);
3854                 /* if we had an established connection
3855                  * and one of the nodes newly attaches a disk */
3856                 cr |= (os.conn == C_CONNECTED &&
3857                        (peer_state.disk == D_NEGOTIATING ||
3858                         os.disk == D_NEGOTIATING));
3859                 /* if we have both been inconsistent, and the peer has been
3860                  * forced to be UpToDate with --overwrite-data */
3861                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3862                 /* if we had been plain connected, and the admin requested to
3863                  * start a sync by "invalidate" or "invalidate-remote" */
3864                 cr |= (os.conn == C_CONNECTED &&
3865                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3866                                  peer_state.conn <= C_WF_BITMAP_T));
3867
3868                 if (cr)
3869                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3870
3871                 put_ldev(mdev);
3872                 if (ns.conn == C_MASK) {
3873                         ns.conn = C_CONNECTED;
3874                         if (mdev->state.disk == D_NEGOTIATING) {
3875                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3876                         } else if (peer_state.disk == D_NEGOTIATING) {
3877                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3878                                 peer_state.disk = D_DISKLESS;
3879                                 real_peer_disk = D_DISKLESS;
3880                         } else {
3881                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3882                                         return -EIO;
3883                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3884                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3885                                 return -EIO;
3886                         }
3887                 }
3888         }
3889
3890         spin_lock_irq(&mdev->tconn->req_lock);
3891         if (os.i != drbd_read_state(mdev).i)
3892                 goto retry;
3893         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3894         ns.peer = peer_state.role;
3895         ns.pdsk = real_peer_disk;
3896         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3897         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3898                 ns.disk = mdev->new_state_tmp.disk;
3899         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3900         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3901             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3902                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3903                    for temporal network outages! */
3904                 spin_unlock_irq(&mdev->tconn->req_lock);
3905                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3906                 tl_clear(mdev->tconn);
3907                 drbd_uuid_new_current(mdev);
3908                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3909                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3910                 return -EIO;
3911         }
3912         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3913         ns = drbd_read_state(mdev);
3914         spin_unlock_irq(&mdev->tconn->req_lock);
3915
3916         if (rv < SS_SUCCESS) {
3917                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3918                 return -EIO;
3919         }
3920
3921         if (os.conn > C_WF_REPORT_PARAMS) {
3922                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3923                     peer_state.disk != D_NEGOTIATING ) {
3924                         /* we want resync, peer has not yet decided to sync... */
3925                         /* Nowadays only used when forcing a node into primary role and
3926                            setting its disk to UpToDate with that */
3927                         drbd_send_uuids(mdev);
3928                         drbd_send_current_state(mdev);
3929                 }
3930         }
3931
3932         clear_bit(DISCARD_MY_DATA, &mdev->flags);
3933
3934         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3935
3936         return 0;
3937 }
3938
3939 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3940 {
3941         struct drbd_conf *mdev;
3942         struct p_rs_uuid *p = pi->data;
3943
3944         mdev = vnr_to_mdev(tconn, pi->vnr);
3945         if (!mdev)
3946                 return -EIO;
3947
3948         wait_event(mdev->misc_wait,
3949                    mdev->state.conn == C_WF_SYNC_UUID ||
3950                    mdev->state.conn == C_BEHIND ||
3951                    mdev->state.conn < C_CONNECTED ||
3952                    mdev->state.disk < D_NEGOTIATING);
3953
3954         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3955
3956         /* Here the _drbd_uuid_ functions are right, current should
3957            _not_ be rotated into the history */
3958         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3959                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3960                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3961
3962                 drbd_print_uuids(mdev, "updated sync uuid");
3963                 drbd_start_resync(mdev, C_SYNC_TARGET);
3964
3965                 put_ldev(mdev);
3966         } else
3967                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3968
3969         return 0;
3970 }
3971
3972 /**
3973  * receive_bitmap_plain
3974  *
3975  * Return 0 when done, 1 when another iteration is needed, and a negative error
3976  * code upon failure.
3977  */
3978 static int
3979 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3980                      unsigned long *p, struct bm_xfer_ctx *c)
3981 {
3982         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3983                                  drbd_header_size(mdev->tconn);
3984         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3985                                        c->bm_words - c->word_offset);
3986         unsigned int want = num_words * sizeof(*p);
3987         int err;
3988
3989         if (want != size) {
3990                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3991                 return -EIO;
3992         }
3993         if (want == 0)
3994                 return 0;
3995         err = drbd_recv_all(mdev->tconn, p, want);
3996         if (err)
3997                 return err;
3998
3999         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
4000
4001         c->word_offset += num_words;
4002         c->bit_offset = c->word_offset * BITS_PER_LONG;
4003         if (c->bit_offset > c->bm_bits)
4004                 c->bit_offset = c->bm_bits;
4005
4006         return 1;
4007 }
4008
4009 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4010 {
4011         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4012 }
4013
4014 static int dcbp_get_start(struct p_compressed_bm *p)
4015 {
4016         return (p->encoding & 0x80) != 0;
4017 }
4018
4019 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4020 {
4021         return (p->encoding >> 4) & 0x7;
4022 }
4023
4024 /**
4025  * recv_bm_rle_bits
4026  *
4027  * Return 0 when done, 1 when another iteration is needed, and a negative error
4028  * code upon failure.
4029  */
4030 static int
4031 recv_bm_rle_bits(struct drbd_conf *mdev,
4032                 struct p_compressed_bm *p,
4033                  struct bm_xfer_ctx *c,
4034                  unsigned int len)
4035 {
4036         struct bitstream bs;
4037         u64 look_ahead;
4038         u64 rl;
4039         u64 tmp;
4040         unsigned long s = c->bit_offset;
4041         unsigned long e;
4042         int toggle = dcbp_get_start(p);
4043         int have;
4044         int bits;
4045
4046         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4047
4048         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4049         if (bits < 0)
4050                 return -EIO;
4051
4052         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4053                 bits = vli_decode_bits(&rl, look_ahead);
4054                 if (bits <= 0)
4055                         return -EIO;
4056
4057                 if (toggle) {
4058                         e = s + rl -1;
4059                         if (e >= c->bm_bits) {
4060                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4061                                 return -EIO;
4062                         }
4063                         _drbd_bm_set_bits(mdev, s, e);
4064                 }
4065
4066                 if (have < bits) {
4067                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4068                                 have, bits, look_ahead,
4069                                 (unsigned int)(bs.cur.b - p->code),
4070                                 (unsigned int)bs.buf_len);
4071                         return -EIO;
4072                 }
4073                 look_ahead >>= bits;
4074                 have -= bits;
4075
4076                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4077                 if (bits < 0)
4078                         return -EIO;
4079                 look_ahead |= tmp << have;
4080                 have += bits;
4081         }
4082
4083         c->bit_offset = s;
4084         bm_xfer_ctx_bit_to_word_offset(c);
4085
4086         return (s != c->bm_bits);
4087 }
4088
4089 /**
4090  * decode_bitmap_c
4091  *
4092  * Return 0 when done, 1 when another iteration is needed, and a negative error
4093  * code upon failure.
4094  */
4095 static int
4096 decode_bitmap_c(struct drbd_conf *mdev,
4097                 struct p_compressed_bm *p,
4098                 struct bm_xfer_ctx *c,
4099                 unsigned int len)
4100 {
4101         if (dcbp_get_code(p) == RLE_VLI_Bits)
4102                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4103
4104         /* other variants had been implemented for evaluation,
4105          * but have been dropped as this one turned out to be "best"
4106          * during all our tests. */
4107
4108         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4109         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4110         return -EIO;
4111 }
4112
4113 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4114                 const char *direction, struct bm_xfer_ctx *c)
4115 {
4116         /* what would it take to transfer it "plaintext" */
4117         unsigned int header_size = drbd_header_size(mdev->tconn);
4118         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4119         unsigned int plain =
4120                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4121                 c->bm_words * sizeof(unsigned long);
4122         unsigned int total = c->bytes[0] + c->bytes[1];
4123         unsigned int r;
4124
4125         /* total can not be zero. but just in case: */
4126         if (total == 0)
4127                 return;
4128
4129         /* don't report if not compressed */
4130         if (total >= plain)
4131                 return;
4132
4133         /* total < plain. check for overflow, still */
4134         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4135                                     : (1000 * total / plain);
4136
4137         if (r > 1000)
4138                 r = 1000;
4139
4140         r = 1000 - r;
4141         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4142              "total %u; compression: %u.%u%%\n",
4143                         direction,
4144                         c->bytes[1], c->packets[1],
4145                         c->bytes[0], c->packets[0],
4146                         total, r/10, r % 10);
4147 }
4148
4149 /* Since we are processing the bitfield from lower addresses to higher,
4150    it does not matter if the process it in 32 bit chunks or 64 bit
4151    chunks as long as it is little endian. (Understand it as byte stream,
4152    beginning with the lowest byte...) If we would use big endian
4153    we would need to process it from the highest address to the lowest,
4154    in order to be agnostic to the 32 vs 64 bits issue.
4155
4156    returns 0 on failure, 1 if we successfully received it. */
4157 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4158 {
4159         struct drbd_conf *mdev;
4160         struct bm_xfer_ctx c;
4161         int err;
4162
4163         mdev = vnr_to_mdev(tconn, pi->vnr);
4164         if (!mdev)
4165                 return -EIO;
4166
4167         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4168         /* you are supposed to send additional out-of-sync information
4169          * if you actually set bits during this phase */
4170
4171         c = (struct bm_xfer_ctx) {
4172                 .bm_bits = drbd_bm_bits(mdev),
4173                 .bm_words = drbd_bm_words(mdev),
4174         };
4175
4176         for(;;) {
4177                 if (pi->cmd == P_BITMAP)
4178                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4179                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4180                         /* MAYBE: sanity check that we speak proto >= 90,
4181                          * and the feature is enabled! */
4182                         struct p_compressed_bm *p = pi->data;
4183
4184                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4185                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4186                                 err = -EIO;
4187                                 goto out;
4188                         }
4189                         if (pi->size <= sizeof(*p)) {
4190                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4191                                 err = -EIO;
4192                                 goto out;
4193                         }
4194                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4195                         if (err)
4196                                goto out;
4197                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4198                 } else {
4199                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4200                         err = -EIO;
4201                         goto out;
4202                 }
4203
4204                 c.packets[pi->cmd == P_BITMAP]++;
4205                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4206
4207                 if (err <= 0) {
4208                         if (err < 0)
4209                                 goto out;
4210                         break;
4211                 }
4212                 err = drbd_recv_header(mdev->tconn, pi);
4213                 if (err)
4214                         goto out;
4215         }
4216
4217         INFO_bm_xfer_stats(mdev, "receive", &c);
4218
4219         if (mdev->state.conn == C_WF_BITMAP_T) {
4220                 enum drbd_state_rv rv;
4221
4222                 err = drbd_send_bitmap(mdev);
4223                 if (err)
4224                         goto out;
4225                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4226                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4227                 D_ASSERT(rv == SS_SUCCESS);
4228         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4229                 /* admin may have requested C_DISCONNECTING,
4230                  * other threads may have noticed network errors */
4231                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4232                     drbd_conn_str(mdev->state.conn));
4233         }
4234         err = 0;
4235
4236  out:
4237         drbd_bm_unlock(mdev);
4238         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4239                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4240         return err;
4241 }
4242
4243 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4244 {
4245         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4246                  pi->cmd, pi->size);
4247
4248         return ignore_remaining_packet(tconn, pi);
4249 }
4250
4251 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4252 {
4253         /* Make sure we've acked all the TCP data associated
4254          * with the data requests being unplugged */
4255         drbd_tcp_quickack(tconn->data.socket);
4256
4257         return 0;
4258 }
4259
4260 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4261 {
4262         struct drbd_conf *mdev;
4263         struct p_block_desc *p = pi->data;
4264
4265         mdev = vnr_to_mdev(tconn, pi->vnr);
4266         if (!mdev)
4267                 return -EIO;
4268
4269         switch (mdev->state.conn) {
4270         case C_WF_SYNC_UUID:
4271         case C_WF_BITMAP_T:
4272         case C_BEHIND:
4273                         break;
4274         default:
4275                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4276                                 drbd_conn_str(mdev->state.conn));
4277         }
4278
4279         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4280
4281         return 0;
4282 }
4283
4284 struct data_cmd {
4285         int expect_payload;
4286         size_t pkt_size;
4287         int (*fn)(struct drbd_tconn *, struct packet_info *);
4288 };
4289
4290 static struct data_cmd drbd_cmd_handler[] = {
4291         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4292         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4293         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4294         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4295         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4296         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4297         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4298         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4299         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4300         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4301         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4302         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4303         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4304         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4305         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4306         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4307         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4308         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4309         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4310         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4311         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4312         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4313         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4314         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4315 };
4316
4317 static void drbdd(struct drbd_tconn *tconn)
4318 {
4319         struct packet_info pi;
4320         size_t shs; /* sub header size */
4321         int err;
4322
4323         while (get_t_state(&tconn->receiver) == RUNNING) {
4324                 struct data_cmd *cmd;
4325
4326                 drbd_thread_current_set_cpu(&tconn->receiver);
4327                 if (drbd_recv_header(tconn, &pi))
4328                         goto err_out;
4329
4330                 cmd = &drbd_cmd_handler[pi.cmd];
4331                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4332                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4333                                  cmdname(pi.cmd), pi.cmd);
4334                         goto err_out;
4335                 }
4336
4337                 shs = cmd->pkt_size;
4338                 if (pi.size > shs && !cmd->expect_payload) {
4339                         conn_err(tconn, "No payload expected %s l:%d\n",
4340                                  cmdname(pi.cmd), pi.size);
4341                         goto err_out;
4342                 }
4343
4344                 if (shs) {
4345                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4346                         if (err)
4347                                 goto err_out;
4348                         pi.size -= shs;
4349                 }
4350
4351                 err = cmd->fn(tconn, &pi);
4352                 if (err) {
4353                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4354                                  cmdname(pi.cmd), err, pi.size);
4355                         goto err_out;
4356                 }
4357         }
4358         return;
4359
4360     err_out:
4361         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4362 }
4363
4364 void conn_flush_workqueue(struct drbd_tconn *tconn)
4365 {
4366         struct drbd_wq_barrier barr;
4367
4368         barr.w.cb = w_prev_work_done;
4369         barr.w.tconn = tconn;
4370         init_completion(&barr.done);
4371         drbd_queue_work(&tconn->data.work, &barr.w);
4372         wait_for_completion(&barr.done);
4373 }
4374
4375 static void conn_disconnect(struct drbd_tconn *tconn)
4376 {
4377         struct drbd_conf *mdev;
4378         enum drbd_conns oc;
4379         int vnr;
4380
4381         if (tconn->cstate == C_STANDALONE)
4382                 return;
4383
4384         /* We are about to start the cleanup after connection loss.
4385          * Make sure drbd_make_request knows about that.
4386          * Usually we should be in some network failure state already,
4387          * but just in case we are not, we fix it up here.
4388          */
4389         conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4390
4391         /* asender does not clean up anything. it must not interfere, either */
4392         drbd_thread_stop(&tconn->asender);
4393         drbd_free_sock(tconn);
4394
4395         rcu_read_lock();
4396         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4397                 kref_get(&mdev->kref);
4398                 rcu_read_unlock();
4399                 drbd_disconnected(mdev);
4400                 kref_put(&mdev->kref, &drbd_minor_destroy);
4401                 rcu_read_lock();
4402         }
4403         rcu_read_unlock();
4404
4405         if (!list_empty(&tconn->current_epoch->list))
4406                 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4407         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4408         atomic_set(&tconn->current_epoch->epoch_size, 0);
4409
4410         conn_info(tconn, "Connection closed\n");
4411
4412         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4413                 conn_try_outdate_peer_async(tconn);
4414
4415         spin_lock_irq(&tconn->req_lock);
4416         oc = tconn->cstate;
4417         if (oc >= C_UNCONNECTED)
4418                 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4419
4420         spin_unlock_irq(&tconn->req_lock);
4421
4422         if (oc == C_DISCONNECTING)
4423                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4424 }
4425
4426 static int drbd_disconnected(struct drbd_conf *mdev)
4427 {
4428         unsigned int i;
4429
4430         /* wait for current activity to cease. */
4431         spin_lock_irq(&mdev->tconn->req_lock);
4432         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4433         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4434         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4435         spin_unlock_irq(&mdev->tconn->req_lock);
4436
4437         /* We do not have data structures that would allow us to
4438          * get the rs_pending_cnt down to 0 again.
4439          *  * On C_SYNC_TARGET we do not have any data structures describing
4440          *    the pending RSDataRequest's we have sent.
4441          *  * On C_SYNC_SOURCE there is no data structure that tracks
4442          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4443          *  And no, it is not the sum of the reference counts in the
4444          *  resync_LRU. The resync_LRU tracks the whole operation including
4445          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4446          *  on the fly. */
4447         drbd_rs_cancel_all(mdev);
4448         mdev->rs_total = 0;
4449         mdev->rs_failed = 0;
4450         atomic_set(&mdev->rs_pending_cnt, 0);
4451         wake_up(&mdev->misc_wait);
4452
4453         del_timer_sync(&mdev->resync_timer);
4454         resync_timer_fn((unsigned long)mdev);
4455
4456         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4457          * w_make_resync_request etc. which may still be on the worker queue
4458          * to be "canceled" */
4459         drbd_flush_workqueue(mdev);
4460
4461         drbd_finish_peer_reqs(mdev);
4462
4463         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4464            might have issued a work again. The one before drbd_finish_peer_reqs() is
4465            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4466         drbd_flush_workqueue(mdev);
4467
4468         kfree(mdev->p_uuid);
4469         mdev->p_uuid = NULL;
4470
4471         if (!drbd_suspended(mdev))
4472                 tl_clear(mdev->tconn);
4473
4474         drbd_md_sync(mdev);
4475
4476         /* serialize with bitmap writeout triggered by the state change,
4477          * if any. */
4478         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4479
4480         /* tcp_close and release of sendpage pages can be deferred.  I don't
4481          * want to use SO_LINGER, because apparently it can be deferred for
4482          * more than 20 seconds (longest time I checked).
4483          *
4484          * Actually we don't care for exactly when the network stack does its
4485          * put_page(), but release our reference on these pages right here.
4486          */
4487         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4488         if (i)
4489                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4490         i = atomic_read(&mdev->pp_in_use_by_net);
4491         if (i)
4492                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4493         i = atomic_read(&mdev->pp_in_use);
4494         if (i)
4495                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4496
4497         D_ASSERT(list_empty(&mdev->read_ee));
4498         D_ASSERT(list_empty(&mdev->active_ee));
4499         D_ASSERT(list_empty(&mdev->sync_ee));
4500         D_ASSERT(list_empty(&mdev->done_ee));
4501
4502         return 0;
4503 }
4504
4505 /*
4506  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4507  * we can agree on is stored in agreed_pro_version.
4508  *
4509  * feature flags and the reserved array should be enough room for future
4510  * enhancements of the handshake protocol, and possible plugins...
4511  *
4512  * for now, they are expected to be zero, but ignored.
4513  */
4514 static int drbd_send_features(struct drbd_tconn *tconn)
4515 {
4516         struct drbd_socket *sock;
4517         struct p_connection_features *p;
4518
4519         sock = &tconn->data;
4520         p = conn_prepare_command(tconn, sock);
4521         if (!p)
4522                 return -EIO;
4523         memset(p, 0, sizeof(*p));
4524         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4525         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4526         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4527 }
4528
4529 /*
4530  * return values:
4531  *   1 yes, we have a valid connection
4532  *   0 oops, did not work out, please try again
4533  *  -1 peer talks different language,
4534  *     no point in trying again, please go standalone.
4535  */
4536 static int drbd_do_features(struct drbd_tconn *tconn)
4537 {
4538         /* ASSERT current == tconn->receiver ... */
4539         struct p_connection_features *p;
4540         const int expect = sizeof(struct p_connection_features);
4541         struct packet_info pi;
4542         int err;
4543
4544         err = drbd_send_features(tconn);
4545         if (err)
4546                 return 0;
4547
4548         err = drbd_recv_header(tconn, &pi);
4549         if (err)
4550                 return 0;
4551
4552         if (pi.cmd != P_CONNECTION_FEATURES) {
4553                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4554                          cmdname(pi.cmd), pi.cmd);
4555                 return -1;
4556         }
4557
4558         if (pi.size != expect) {
4559                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4560                      expect, pi.size);
4561                 return -1;
4562         }
4563
4564         p = pi.data;
4565         err = drbd_recv_all_warn(tconn, p, expect);
4566         if (err)
4567                 return 0;
4568
4569         p->protocol_min = be32_to_cpu(p->protocol_min);
4570         p->protocol_max = be32_to_cpu(p->protocol_max);
4571         if (p->protocol_max == 0)
4572                 p->protocol_max = p->protocol_min;
4573
4574         if (PRO_VERSION_MAX < p->protocol_min ||
4575             PRO_VERSION_MIN > p->protocol_max)
4576                 goto incompat;
4577
4578         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4579
4580         conn_info(tconn, "Handshake successful: "
4581              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4582
4583         return 1;
4584
4585  incompat:
4586         conn_err(tconn, "incompatible DRBD dialects: "
4587             "I support %d-%d, peer supports %d-%d\n",
4588             PRO_VERSION_MIN, PRO_VERSION_MAX,
4589             p->protocol_min, p->protocol_max);
4590         return -1;
4591 }
4592
4593 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4594 static int drbd_do_auth(struct drbd_tconn *tconn)
4595 {
4596         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4597         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4598         return -1;
4599 }
4600 #else
4601 #define CHALLENGE_LEN 64
4602
4603 /* Return value:
4604         1 - auth succeeded,
4605         0 - failed, try again (network error),
4606         -1 - auth failed, don't try again.
4607 */
4608
4609 static int drbd_do_auth(struct drbd_tconn *tconn)
4610 {
4611         struct drbd_socket *sock;
4612         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4613         struct scatterlist sg;
4614         char *response = NULL;
4615         char *right_response = NULL;
4616         char *peers_ch = NULL;
4617         unsigned int key_len;
4618         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4619         unsigned int resp_size;
4620         struct hash_desc desc;
4621         struct packet_info pi;
4622         struct net_conf *nc;
4623         int err, rv;
4624
4625         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4626
4627         rcu_read_lock();
4628         nc = rcu_dereference(tconn->net_conf);
4629         key_len = strlen(nc->shared_secret);
4630         memcpy(secret, nc->shared_secret, key_len);
4631         rcu_read_unlock();
4632
4633         desc.tfm = tconn->cram_hmac_tfm;
4634         desc.flags = 0;
4635
4636         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4637         if (rv) {
4638                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4639                 rv = -1;
4640                 goto fail;
4641         }
4642
4643         get_random_bytes(my_challenge, CHALLENGE_LEN);
4644
4645         sock = &tconn->data;
4646         if (!conn_prepare_command(tconn, sock)) {
4647                 rv = 0;
4648                 goto fail;
4649         }
4650         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4651                                 my_challenge, CHALLENGE_LEN);
4652         if (!rv)
4653                 goto fail;
4654
4655         err = drbd_recv_header(tconn, &pi);
4656         if (err) {
4657                 rv = 0;
4658                 goto fail;
4659         }
4660
4661         if (pi.cmd != P_AUTH_CHALLENGE) {
4662                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4663                          cmdname(pi.cmd), pi.cmd);
4664                 rv = 0;
4665                 goto fail;
4666         }
4667
4668         if (pi.size > CHALLENGE_LEN * 2) {
4669                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4670                 rv = -1;
4671                 goto fail;
4672         }
4673
4674         peers_ch = kmalloc(pi.size, GFP_NOIO);
4675         if (peers_ch == NULL) {
4676                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4677                 rv = -1;
4678                 goto fail;
4679         }
4680
4681         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4682         if (err) {
4683                 rv = 0;
4684                 goto fail;
4685         }
4686
4687         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4688         response = kmalloc(resp_size, GFP_NOIO);
4689         if (response == NULL) {
4690                 conn_err(tconn, "kmalloc of response failed\n");
4691                 rv = -1;
4692                 goto fail;
4693         }
4694
4695         sg_init_table(&sg, 1);
4696         sg_set_buf(&sg, peers_ch, pi.size);
4697
4698         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4699         if (rv) {
4700                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4701                 rv = -1;
4702                 goto fail;
4703         }
4704
4705         if (!conn_prepare_command(tconn, sock)) {
4706                 rv = 0;
4707                 goto fail;
4708         }
4709         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4710                                 response, resp_size);
4711         if (!rv)
4712                 goto fail;
4713
4714         err = drbd_recv_header(tconn, &pi);
4715         if (err) {
4716                 rv = 0;
4717                 goto fail;
4718         }
4719
4720         if (pi.cmd != P_AUTH_RESPONSE) {
4721                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4722                          cmdname(pi.cmd), pi.cmd);
4723                 rv = 0;
4724                 goto fail;
4725         }
4726
4727         if (pi.size != resp_size) {
4728                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4729                 rv = 0;
4730                 goto fail;
4731         }
4732
4733         err = drbd_recv_all_warn(tconn, response , resp_size);
4734         if (err) {
4735                 rv = 0;
4736                 goto fail;
4737         }
4738
4739         right_response = kmalloc(resp_size, GFP_NOIO);
4740         if (right_response == NULL) {
4741                 conn_err(tconn, "kmalloc of right_response failed\n");
4742                 rv = -1;
4743                 goto fail;
4744         }
4745
4746         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4747
4748         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4749         if (rv) {
4750                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4751                 rv = -1;
4752                 goto fail;
4753         }
4754
4755         rv = !memcmp(response, right_response, resp_size);
4756
4757         if (rv)
4758                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4759                      resp_size);
4760         else
4761                 rv = -1;
4762
4763  fail:
4764         kfree(peers_ch);
4765         kfree(response);
4766         kfree(right_response);
4767
4768         return rv;
4769 }
4770 #endif
4771
4772 int drbdd_init(struct drbd_thread *thi)
4773 {
4774         struct drbd_tconn *tconn = thi->tconn;
4775         int h;
4776
4777         conn_info(tconn, "receiver (re)started\n");
4778
4779         do {
4780                 h = conn_connect(tconn);
4781                 if (h == 0) {
4782                         conn_disconnect(tconn);
4783                         schedule_timeout_interruptible(HZ);
4784                 }
4785                 if (h == -1) {
4786                         conn_warn(tconn, "Discarding network configuration.\n");
4787                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4788                 }
4789         } while (h == 0);
4790
4791         if (h > 0)
4792                 drbdd(tconn);
4793
4794         conn_disconnect(tconn);
4795
4796         conn_info(tconn, "receiver terminated\n");
4797         return 0;
4798 }
4799
4800 /* ********* acknowledge sender ******** */
4801
4802 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4803 {
4804         struct p_req_state_reply *p = pi->data;
4805         int retcode = be32_to_cpu(p->retcode);
4806
4807         if (retcode >= SS_SUCCESS) {
4808                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4809         } else {
4810                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4811                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4812                          drbd_set_st_err_str(retcode), retcode);
4813         }
4814         wake_up(&tconn->ping_wait);
4815
4816         return 0;
4817 }
4818
4819 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4820 {
4821         struct drbd_conf *mdev;
4822         struct p_req_state_reply *p = pi->data;
4823         int retcode = be32_to_cpu(p->retcode);
4824
4825         mdev = vnr_to_mdev(tconn, pi->vnr);
4826         if (!mdev)
4827                 return -EIO;
4828
4829         if (test_bit(CONN_WD_ST_CHG_REQ, &tconn->flags)) {
4830                 D_ASSERT(tconn->agreed_pro_version < 100);
4831                 return got_conn_RqSReply(tconn, pi);
4832         }
4833
4834         if (retcode >= SS_SUCCESS) {
4835                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4836         } else {
4837                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4838                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4839                         drbd_set_st_err_str(retcode), retcode);
4840         }
4841         wake_up(&mdev->state_wait);
4842
4843         return 0;
4844 }
4845
4846 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4847 {
4848         return drbd_send_ping_ack(tconn);
4849
4850 }
4851
4852 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4853 {
4854         /* restore idle timeout */
4855         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4856         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4857                 wake_up(&tconn->ping_wait);
4858
4859         return 0;
4860 }
4861
4862 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4863 {
4864         struct drbd_conf *mdev;
4865         struct p_block_ack *p = pi->data;
4866         sector_t sector = be64_to_cpu(p->sector);
4867         int blksize = be32_to_cpu(p->blksize);
4868
4869         mdev = vnr_to_mdev(tconn, pi->vnr);
4870         if (!mdev)
4871                 return -EIO;
4872
4873         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4874
4875         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4876
4877         if (get_ldev(mdev)) {
4878                 drbd_rs_complete_io(mdev, sector);
4879                 drbd_set_in_sync(mdev, sector, blksize);
4880                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4881                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4882                 put_ldev(mdev);
4883         }
4884         dec_rs_pending(mdev);
4885         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4886
4887         return 0;
4888 }
4889
4890 static int
4891 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4892                               struct rb_root *root, const char *func,
4893                               enum drbd_req_event what, bool missing_ok)
4894 {
4895         struct drbd_request *req;
4896         struct bio_and_error m;
4897
4898         spin_lock_irq(&mdev->tconn->req_lock);
4899         req = find_request(mdev, root, id, sector, missing_ok, func);
4900         if (unlikely(!req)) {
4901                 spin_unlock_irq(&mdev->tconn->req_lock);
4902                 return -EIO;
4903         }
4904         __req_mod(req, what, &m);
4905         spin_unlock_irq(&mdev->tconn->req_lock);
4906
4907         if (m.bio)
4908                 complete_master_bio(mdev, &m);
4909         return 0;
4910 }
4911
4912 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4913 {
4914         struct drbd_conf *mdev;
4915         struct p_block_ack *p = pi->data;
4916         sector_t sector = be64_to_cpu(p->sector);
4917         int blksize = be32_to_cpu(p->blksize);
4918         enum drbd_req_event what;
4919
4920         mdev = vnr_to_mdev(tconn, pi->vnr);
4921         if (!mdev)
4922                 return -EIO;
4923
4924         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4925
4926         if (p->block_id == ID_SYNCER) {
4927                 drbd_set_in_sync(mdev, sector, blksize);
4928                 dec_rs_pending(mdev);
4929                 return 0;
4930         }
4931         switch (pi->cmd) {
4932         case P_RS_WRITE_ACK:
4933                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4934                 break;
4935         case P_WRITE_ACK:
4936                 what = WRITE_ACKED_BY_PEER;
4937                 break;
4938         case P_RECV_ACK:
4939                 what = RECV_ACKED_BY_PEER;
4940                 break;
4941         case P_DISCARD_WRITE:
4942                 what = DISCARD_WRITE;
4943                 break;
4944         case P_RETRY_WRITE:
4945                 what = POSTPONE_WRITE;
4946                 break;
4947         default:
4948                 BUG();
4949         }
4950
4951         return validate_req_change_req_state(mdev, p->block_id, sector,
4952                                              &mdev->write_requests, __func__,
4953                                              what, false);
4954 }
4955
4956 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4957 {
4958         struct drbd_conf *mdev;
4959         struct p_block_ack *p = pi->data;
4960         sector_t sector = be64_to_cpu(p->sector);
4961         int size = be32_to_cpu(p->blksize);
4962         int err;
4963
4964         mdev = vnr_to_mdev(tconn, pi->vnr);
4965         if (!mdev)
4966                 return -EIO;
4967
4968         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4969
4970         if (p->block_id == ID_SYNCER) {
4971                 dec_rs_pending(mdev);
4972                 drbd_rs_failed_io(mdev, sector, size);
4973                 return 0;
4974         }
4975
4976         err = validate_req_change_req_state(mdev, p->block_id, sector,
4977                                             &mdev->write_requests, __func__,
4978                                             NEG_ACKED, true);
4979         if (err) {
4980                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4981                    The master bio might already be completed, therefore the
4982                    request is no longer in the collision hash. */
4983                 /* In Protocol B we might already have got a P_RECV_ACK
4984                    but then get a P_NEG_ACK afterwards. */
4985                 drbd_set_out_of_sync(mdev, sector, size);
4986         }
4987         return 0;
4988 }
4989
4990 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4991 {
4992         struct drbd_conf *mdev;
4993         struct p_block_ack *p = pi->data;
4994         sector_t sector = be64_to_cpu(p->sector);
4995
4996         mdev = vnr_to_mdev(tconn, pi->vnr);
4997         if (!mdev)
4998                 return -EIO;
4999
5000         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5001
5002         dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
5003             (unsigned long long)sector, be32_to_cpu(p->blksize));
5004
5005         return validate_req_change_req_state(mdev, p->block_id, sector,
5006                                              &mdev->read_requests, __func__,
5007                                              NEG_ACKED, false);
5008 }
5009
5010 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5011 {
5012         struct drbd_conf *mdev;
5013         sector_t sector;
5014         int size;
5015         struct p_block_ack *p = pi->data;
5016
5017         mdev = vnr_to_mdev(tconn, pi->vnr);
5018         if (!mdev)
5019                 return -EIO;
5020
5021         sector = be64_to_cpu(p->sector);
5022         size = be32_to_cpu(p->blksize);
5023
5024         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5025
5026         dec_rs_pending(mdev);
5027
5028         if (get_ldev_if_state(mdev, D_FAILED)) {
5029                 drbd_rs_complete_io(mdev, sector);
5030                 switch (pi->cmd) {
5031                 case P_NEG_RS_DREPLY:
5032                         drbd_rs_failed_io(mdev, sector, size);
5033                 case P_RS_CANCEL:
5034                         break;
5035                 default:
5036                         BUG();
5037                 }
5038                 put_ldev(mdev);
5039         }
5040
5041         return 0;
5042 }
5043
5044 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5045 {
5046         struct p_barrier_ack *p = pi->data;
5047         struct drbd_conf *mdev;
5048         int vnr;
5049
5050         tl_release(tconn, p->barrier, be32_to_cpu(p->set_size));
5051
5052         rcu_read_lock();
5053         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5054                 if (mdev->state.conn == C_AHEAD &&
5055                     atomic_read(&mdev->ap_in_flight) == 0 &&
5056                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5057                         mdev->start_resync_timer.expires = jiffies + HZ;
5058                         add_timer(&mdev->start_resync_timer);
5059                 }
5060         }
5061         rcu_read_unlock();
5062
5063         return 0;
5064 }
5065
5066 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5067 {
5068         struct drbd_conf *mdev;
5069         struct p_block_ack *p = pi->data;
5070         struct drbd_work *w;
5071         sector_t sector;
5072         int size;
5073
5074         mdev = vnr_to_mdev(tconn, pi->vnr);
5075         if (!mdev)
5076                 return -EIO;
5077
5078         sector = be64_to_cpu(p->sector);
5079         size = be32_to_cpu(p->blksize);
5080
5081         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5082
5083         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5084                 drbd_ov_out_of_sync_found(mdev, sector, size);
5085         else
5086                 ov_out_of_sync_print(mdev);
5087
5088         if (!get_ldev(mdev))
5089                 return 0;
5090
5091         drbd_rs_complete_io(mdev, sector);
5092         dec_rs_pending(mdev);
5093
5094         --mdev->ov_left;
5095
5096         /* let's advance progress step marks only for every other megabyte */
5097         if ((mdev->ov_left & 0x200) == 0x200)
5098                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5099
5100         if (mdev->ov_left == 0) {
5101                 w = kmalloc(sizeof(*w), GFP_NOIO);
5102                 if (w) {
5103                         w->cb = w_ov_finished;
5104                         w->mdev = mdev;
5105                         drbd_queue_work_front(&mdev->tconn->data.work, w);
5106                 } else {
5107                         dev_err(DEV, "kmalloc(w) failed.");
5108                         ov_out_of_sync_print(mdev);
5109                         drbd_resync_finished(mdev);
5110                 }
5111         }
5112         put_ldev(mdev);
5113         return 0;
5114 }
5115
5116 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5117 {
5118         return 0;
5119 }
5120
5121 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5122 {
5123         struct drbd_conf *mdev;
5124         int vnr, not_empty = 0;
5125
5126         do {
5127                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5128                 flush_signals(current);
5129
5130                 rcu_read_lock();
5131                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5132                         kref_get(&mdev->kref);
5133                         rcu_read_unlock();
5134                         if (drbd_finish_peer_reqs(mdev)) {
5135                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5136                                 return 1;
5137                         }
5138                         kref_put(&mdev->kref, &drbd_minor_destroy);
5139                         rcu_read_lock();
5140                 }
5141                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5142
5143                 spin_lock_irq(&tconn->req_lock);
5144                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5145                         not_empty = !list_empty(&mdev->done_ee);
5146                         if (not_empty)
5147                                 break;
5148                 }
5149                 spin_unlock_irq(&tconn->req_lock);
5150                 rcu_read_unlock();
5151         } while (not_empty);
5152
5153         return 0;
5154 }
5155
5156 struct asender_cmd {
5157         size_t pkt_size;
5158         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5159 };
5160
5161 static struct asender_cmd asender_tbl[] = {
5162         [P_PING]            = { 0, got_Ping },
5163         [P_PING_ACK]        = { 0, got_PingAck },
5164         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5165         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5166         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5167         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
5168         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5169         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5170         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5171         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5172         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5173         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5174         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5175         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5176         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5177         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5178         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5179 };
5180
5181 int drbd_asender(struct drbd_thread *thi)
5182 {
5183         struct drbd_tconn *tconn = thi->tconn;
5184         struct asender_cmd *cmd = NULL;
5185         struct packet_info pi;
5186         int rv;
5187         void *buf    = tconn->meta.rbuf;
5188         int received = 0;
5189         unsigned int header_size = drbd_header_size(tconn);
5190         int expect   = header_size;
5191         bool ping_timeout_active = false;
5192         struct net_conf *nc;
5193         int ping_timeo, tcp_cork, ping_int;
5194
5195         current->policy = SCHED_RR;  /* Make this a realtime task! */
5196         current->rt_priority = 2;    /* more important than all other tasks */
5197
5198         while (get_t_state(thi) == RUNNING) {
5199                 drbd_thread_current_set_cpu(thi);
5200
5201                 rcu_read_lock();
5202                 nc = rcu_dereference(tconn->net_conf);
5203                 ping_timeo = nc->ping_timeo;
5204                 tcp_cork = nc->tcp_cork;
5205                 ping_int = nc->ping_int;
5206                 rcu_read_unlock();
5207
5208                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5209                         if (drbd_send_ping(tconn)) {
5210                                 conn_err(tconn, "drbd_send_ping has failed\n");
5211                                 goto reconnect;
5212                         }
5213                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5214                         ping_timeout_active = true;
5215                 }
5216
5217                 /* TODO: conditionally cork; it may hurt latency if we cork without
5218                    much to send */
5219                 if (tcp_cork)
5220                         drbd_tcp_cork(tconn->meta.socket);
5221                 if (tconn_finish_peer_reqs(tconn)) {
5222                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5223                         goto reconnect;
5224                 }
5225                 /* but unconditionally uncork unless disabled */
5226                 if (tcp_cork)
5227                         drbd_tcp_uncork(tconn->meta.socket);
5228
5229                 /* short circuit, recv_msg would return EINTR anyways. */
5230                 if (signal_pending(current))
5231                         continue;
5232
5233                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5234                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5235
5236                 flush_signals(current);
5237
5238                 /* Note:
5239                  * -EINTR        (on meta) we got a signal
5240                  * -EAGAIN       (on meta) rcvtimeo expired
5241                  * -ECONNRESET   other side closed the connection
5242                  * -ERESTARTSYS  (on data) we got a signal
5243                  * rv <  0       other than above: unexpected error!
5244                  * rv == expected: full header or command
5245                  * rv <  expected: "woken" by signal during receive
5246                  * rv == 0       : "connection shut down by peer"
5247                  */
5248                 if (likely(rv > 0)) {
5249                         received += rv;
5250                         buf      += rv;
5251                 } else if (rv == 0) {
5252                         conn_err(tconn, "meta connection shut down by peer.\n");
5253                         goto reconnect;
5254                 } else if (rv == -EAGAIN) {
5255                         /* If the data socket received something meanwhile,
5256                          * that is good enough: peer is still alive. */
5257                         if (time_after(tconn->last_received,
5258                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5259                                 continue;
5260                         if (ping_timeout_active) {
5261                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5262                                 goto reconnect;
5263                         }
5264                         set_bit(SEND_PING, &tconn->flags);
5265                         continue;
5266                 } else if (rv == -EINTR) {
5267                         continue;
5268                 } else {
5269                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5270                         goto reconnect;
5271                 }
5272
5273                 if (received == expect && cmd == NULL) {
5274                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5275                                 goto reconnect;
5276                         cmd = &asender_tbl[pi.cmd];
5277                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5278                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5279                                          cmdname(pi.cmd), pi.cmd);
5280                                 goto disconnect;
5281                         }
5282                         expect = header_size + cmd->pkt_size;
5283                         if (pi.size != expect - header_size) {
5284                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5285                                         pi.cmd, pi.size);
5286                                 goto reconnect;
5287                         }
5288                 }
5289                 if (received == expect) {
5290                         bool err;
5291
5292                         err = cmd->fn(tconn, &pi);
5293                         if (err) {
5294                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5295                                 goto reconnect;
5296                         }
5297
5298                         tconn->last_received = jiffies;
5299
5300                         if (cmd == &asender_tbl[P_PING_ACK]) {
5301                                 /* restore idle timeout */
5302                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5303                                 ping_timeout_active = false;
5304                         }
5305
5306                         buf      = tconn->meta.rbuf;
5307                         received = 0;
5308                         expect   = header_size;
5309                         cmd      = NULL;
5310                 }
5311         }
5312
5313         if (0) {
5314 reconnect:
5315                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5316         }
5317         if (0) {
5318 disconnect:
5319                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5320         }
5321         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5322
5323         conn_info(tconn, "asender terminated\n");
5324
5325         return 0;
5326 }