]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
drbd: Restore late assigning of tconn->data.sock and meta.sock
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490
491 out:
492         return err;
493 }
494
495 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
496 {
497         mm_segment_t oldfs;
498         struct kvec iov = {
499                 .iov_base = buf,
500                 .iov_len = size,
501         };
502         struct msghdr msg = {
503                 .msg_iovlen = 1,
504                 .msg_iov = (struct iovec *)&iov,
505                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506         };
507         int rv;
508
509         oldfs = get_fs();
510         set_fs(KERNEL_DS);
511         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512         set_fs(oldfs);
513
514         return rv;
515 }
516
517 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
518 {
519         mm_segment_t oldfs;
520         struct kvec iov = {
521                 .iov_base = buf,
522                 .iov_len = size,
523         };
524         struct msghdr msg = {
525                 .msg_iovlen = 1,
526                 .msg_iov = (struct iovec *)&iov,
527                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528         };
529         int rv;
530
531         oldfs = get_fs();
532         set_fs(KERNEL_DS);
533
534         for (;;) {
535                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
536                 if (rv == size)
537                         break;
538
539                 /* Note:
540                  * ECONNRESET   other side closed the connection
541                  * ERESTARTSYS  (on  sock) we got a signal
542                  */
543
544                 if (rv < 0) {
545                         if (rv == -ECONNRESET)
546                                 conn_info(tconn, "sock was reset by peer\n");
547                         else if (rv != -ERESTARTSYS)
548                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
549                         break;
550                 } else if (rv == 0) {
551                         conn_info(tconn, "sock was shut down by peer\n");
552                         break;
553                 } else  {
554                         /* signal came in, or peer/link went down,
555                          * after we read a partial message
556                          */
557                         /* D_ASSERT(signal_pending(current)); */
558                         break;
559                 }
560         };
561
562         set_fs(oldfs);
563
564         if (rv != size)
565                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
566
567         return rv;
568 }
569
570 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
571 {
572         int err;
573
574         err = drbd_recv(tconn, buf, size);
575         if (err != size) {
576                 if (err >= 0)
577                         err = -EIO;
578         } else
579                 err = 0;
580         return err;
581 }
582
583 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
584 {
585         int err;
586
587         err = drbd_recv_all(tconn, buf, size);
588         if (err && !signal_pending(current))
589                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
590         return err;
591 }
592
593 /* quoting tcp(7):
594  *   On individual connections, the socket buffer size must be set prior to the
595  *   listen(2) or connect(2) calls in order to have it take effect.
596  * This is our wrapper to do so.
597  */
598 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599                 unsigned int rcv)
600 {
601         /* open coded SO_SNDBUF, SO_RCVBUF */
602         if (snd) {
603                 sock->sk->sk_sndbuf = snd;
604                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605         }
606         if (rcv) {
607                 sock->sk->sk_rcvbuf = rcv;
608                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
609         }
610 }
611
612 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
613 {
614         const char *what;
615         struct socket *sock;
616         struct sockaddr_in6 src_in6;
617         struct sockaddr_in6 peer_in6;
618         struct net_conf *nc;
619         int err, peer_addr_len, my_addr_len;
620         int sndbuf_size, rcvbuf_size, connect_int;
621         int disconnect_on_error = 1;
622
623         rcu_read_lock();
624         nc = rcu_dereference(tconn->net_conf);
625         if (!nc) {
626                 rcu_read_unlock();
627                 return NULL;
628         }
629         sndbuf_size = nc->sndbuf_size;
630         rcvbuf_size = nc->rcvbuf_size;
631         connect_int = nc->connect_int;
632         rcu_read_unlock();
633
634         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
635         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
636
637         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
638                 src_in6.sin6_port = 0;
639         else
640                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
641
642         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
643         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
644
645         what = "sock_create_kern";
646         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
647                                SOCK_STREAM, IPPROTO_TCP, &sock);
648         if (err < 0) {
649                 sock = NULL;
650                 goto out;
651         }
652
653         sock->sk->sk_rcvtimeo =
654         sock->sk->sk_sndtimeo = connect_int * HZ;
655         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
656
657        /* explicitly bind to the configured IP as source IP
658         *  for the outgoing connections.
659         *  This is needed for multihomed hosts and to be
660         *  able to use lo: interfaces for drbd.
661         * Make sure to use 0 as port number, so linux selects
662         *  a free one dynamically.
663         */
664         what = "bind before connect";
665         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
666         if (err < 0)
667                 goto out;
668
669         /* connect may fail, peer not yet available.
670          * stay C_WF_CONNECTION, don't go Disconnecting! */
671         disconnect_on_error = 0;
672         what = "connect";
673         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
674
675 out:
676         if (err < 0) {
677                 if (sock) {
678                         sock_release(sock);
679                         sock = NULL;
680                 }
681                 switch (-err) {
682                         /* timeout, busy, signal pending */
683                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
684                 case EINTR: case ERESTARTSYS:
685                         /* peer not (yet) available, network problem */
686                 case ECONNREFUSED: case ENETUNREACH:
687                 case EHOSTDOWN:    case EHOSTUNREACH:
688                         disconnect_on_error = 0;
689                         break;
690                 default:
691                         conn_err(tconn, "%s failed, err = %d\n", what, err);
692                 }
693                 if (disconnect_on_error)
694                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
695         }
696
697         return sock;
698 }
699
700 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
701 {
702         int timeo, err, my_addr_len;
703         int sndbuf_size, rcvbuf_size, connect_int;
704         struct socket *s_estab = NULL, *s_listen;
705         struct sockaddr_in6 my_addr;
706         struct net_conf *nc;
707         const char *what;
708
709         rcu_read_lock();
710         nc = rcu_dereference(tconn->net_conf);
711         if (!nc) {
712                 rcu_read_unlock();
713                 return NULL;
714         }
715         sndbuf_size = nc->sndbuf_size;
716         rcvbuf_size = nc->rcvbuf_size;
717         connect_int = nc->connect_int;
718         rcu_read_unlock();
719
720         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
721         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
722
723         what = "sock_create_kern";
724         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
725                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
726         if (err) {
727                 s_listen = NULL;
728                 goto out;
729         }
730
731         timeo = connect_int * HZ;
732         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
733
734         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
735         s_listen->sk->sk_rcvtimeo = timeo;
736         s_listen->sk->sk_sndtimeo = timeo;
737         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
738
739         what = "bind before listen";
740         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
741         if (err < 0)
742                 goto out;
743
744         err = drbd_accept(&what, s_listen, &s_estab);
745
746 out:
747         if (s_listen)
748                 sock_release(s_listen);
749         if (err < 0) {
750                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
751                         conn_err(tconn, "%s failed, err = %d\n", what, err);
752                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
753                 }
754         }
755
756         return s_estab;
757 }
758
759 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
760
761 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
762                              enum drbd_packet cmd)
763 {
764         if (!conn_prepare_command(tconn, sock))
765                 return -EIO;
766         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
767 }
768
769 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
770 {
771         unsigned int header_size = drbd_header_size(tconn);
772         struct packet_info pi;
773         int err;
774
775         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
776         if (err != header_size) {
777                 if (err >= 0)
778                         err = -EIO;
779                 return err;
780         }
781         err = decode_header(tconn, tconn->data.rbuf, &pi);
782         if (err)
783                 return err;
784         return pi.cmd;
785 }
786
787 /**
788  * drbd_socket_okay() - Free the socket if its connection is not okay
789  * @sock:       pointer to the pointer to the socket.
790  */
791 static int drbd_socket_okay(struct socket **sock)
792 {
793         int rr;
794         char tb[4];
795
796         if (!*sock)
797                 return false;
798
799         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
800
801         if (rr > 0 || rr == -EAGAIN) {
802                 return true;
803         } else {
804                 sock_release(*sock);
805                 *sock = NULL;
806                 return false;
807         }
808 }
809 /* Gets called if a connection is established, or if a new minor gets created
810    in a connection */
811 int drbd_connected(struct drbd_conf *mdev)
812 {
813         int err;
814
815         atomic_set(&mdev->packet_seq, 0);
816         mdev->peer_seq = 0;
817
818         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
819                 &mdev->tconn->cstate_mutex :
820                 &mdev->own_state_mutex;
821
822         err = drbd_send_sync_param(mdev);
823         if (!err)
824                 err = drbd_send_sizes(mdev, 0, 0);
825         if (!err)
826                 err = drbd_send_uuids(mdev);
827         if (!err)
828                 err = drbd_send_current_state(mdev);
829         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
830         clear_bit(RESIZE_PENDING, &mdev->flags);
831         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
832         return err;
833 }
834
835 /*
836  * return values:
837  *   1 yes, we have a valid connection
838  *   0 oops, did not work out, please try again
839  *  -1 peer talks different language,
840  *     no point in trying again, please go standalone.
841  *  -2 We do not have a network config...
842  */
843 static int conn_connect(struct drbd_tconn *tconn)
844 {
845         struct drbd_socket sock, msock;
846         struct drbd_conf *mdev;
847         struct net_conf *nc;
848         int vnr, timeout, try, h, ok;
849         bool discard_my_data;
850
851         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
852                 return -2;
853
854         mutex_init(&sock.mutex);
855         sock.sbuf = tconn->data.sbuf;
856         sock.rbuf = tconn->data.rbuf;
857         sock.socket = NULL;
858         mutex_init(&msock.mutex);
859         msock.sbuf = tconn->meta.sbuf;
860         msock.rbuf = tconn->meta.rbuf;
861         msock.socket = NULL;
862
863         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
864
865         /* Assume that the peer only understands protocol 80 until we know better.  */
866         tconn->agreed_pro_version = 80;
867
868         do {
869                 struct socket *s;
870
871                 for (try = 0;;) {
872                         /* 3 tries, this should take less than a second! */
873                         s = drbd_try_connect(tconn);
874                         if (s || ++try >= 3)
875                                 break;
876                         /* give the other side time to call bind() & listen() */
877                         schedule_timeout_interruptible(HZ / 10);
878                 }
879
880                 if (s) {
881                         if (!sock.socket) {
882                                 sock.socket = s;
883                                 send_first_packet(tconn, &sock, P_INITIAL_DATA);
884                         } else if (!msock.socket) {
885                                 msock.socket = s;
886                                 send_first_packet(tconn, &msock, P_INITIAL_META);
887                         } else {
888                                 conn_err(tconn, "Logic error in conn_connect()\n");
889                                 goto out_release_sockets;
890                         }
891                 }
892
893                 if (sock.socket && msock.socket) {
894                         rcu_read_lock();
895                         nc = rcu_dereference(tconn->net_conf);
896                         timeout = nc->ping_timeo * HZ / 10;
897                         rcu_read_unlock();
898                         schedule_timeout_interruptible(timeout);
899                         ok = drbd_socket_okay(&sock.socket);
900                         ok = drbd_socket_okay(&msock.socket) && ok;
901                         if (ok)
902                                 break;
903                 }
904
905 retry:
906                 s = drbd_wait_for_connect(tconn);
907                 if (s) {
908                         try = receive_first_packet(tconn, s);
909                         drbd_socket_okay(&sock.socket);
910                         drbd_socket_okay(&msock.socket);
911                         switch (try) {
912                         case P_INITIAL_DATA:
913                                 if (sock.socket) {
914                                         conn_warn(tconn, "initial packet S crossed\n");
915                                         sock_release(sock.socket);
916                                 }
917                                 sock.socket = s;
918                                 break;
919                         case P_INITIAL_META:
920                                 if (msock.socket) {
921                                         conn_warn(tconn, "initial packet M crossed\n");
922                                         sock_release(msock.socket);
923                                 }
924                                 msock.socket = s;
925                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
926                                 break;
927                         default:
928                                 conn_warn(tconn, "Error receiving initial packet\n");
929                                 sock_release(s);
930                                 if (random32() & 1)
931                                         goto retry;
932                         }
933                 }
934
935                 if (tconn->cstate <= C_DISCONNECTING)
936                         goto out_release_sockets;
937                 if (signal_pending(current)) {
938                         flush_signals(current);
939                         smp_rmb();
940                         if (get_t_state(&tconn->receiver) == EXITING)
941                                 goto out_release_sockets;
942                 }
943
944                 if (sock.socket && &msock.socket) {
945                         ok = drbd_socket_okay(&sock.socket);
946                         ok = drbd_socket_okay(&msock.socket) && ok;
947                         if (ok)
948                                 break;
949                 }
950         } while (1);
951
952         sock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
953         msock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
954
955         sock.socket->sk->sk_allocation = GFP_NOIO;
956         msock.socket->sk->sk_allocation = GFP_NOIO;
957
958         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
959         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
960
961         /* NOT YET ...
962          * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
963          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
964          * first set it to the P_CONNECTION_FEATURES timeout,
965          * which we set to 4x the configured ping_timeout. */
966         rcu_read_lock();
967         nc = rcu_dereference(tconn->net_conf);
968
969         sock.socket->sk->sk_sndtimeo =
970         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
971
972         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
973         timeout = nc->timeout * HZ / 10;
974         discard_my_data = nc->discard_my_data;
975         rcu_read_unlock();
976
977         msock.socket->sk->sk_sndtimeo = timeout;
978
979         /* we don't want delays.
980          * we use TCP_CORK where appropriate, though */
981         drbd_tcp_nodelay(sock.socket);
982         drbd_tcp_nodelay(msock.socket);
983
984         tconn->data.socket = sock.socket;
985         tconn->meta.socket = msock.socket;
986         tconn->last_received = jiffies;
987
988         h = drbd_do_features(tconn);
989         if (h <= 0)
990                 return h;
991
992         if (tconn->cram_hmac_tfm) {
993                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
994                 switch (drbd_do_auth(tconn)) {
995                 case -1:
996                         conn_err(tconn, "Authentication of peer failed\n");
997                         return -1;
998                 case 0:
999                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
1000                         return 0;
1001                 }
1002         }
1003
1004         tconn->data.socket->sk->sk_sndtimeo = timeout;
1005         tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1006
1007         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1008                 return -1;
1009
1010         rcu_read_lock();
1011         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1012                 kref_get(&mdev->kref);
1013                 rcu_read_unlock();
1014
1015                 if (discard_my_data)
1016                         set_bit(DISCARD_MY_DATA, &mdev->flags);
1017                 else
1018                         clear_bit(DISCARD_MY_DATA, &mdev->flags);
1019
1020                 drbd_connected(mdev);
1021                 kref_put(&mdev->kref, &drbd_minor_destroy);
1022                 rcu_read_lock();
1023         }
1024         rcu_read_unlock();
1025
1026         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
1027                 return 0;
1028
1029         drbd_thread_start(&tconn->asender);
1030
1031         mutex_lock(&tconn->conf_update);
1032         /* The discard_my_data flag is a single-shot modifier to the next
1033          * connection attempt, the handshake of which is now well underway.
1034          * No need for rcu style copying of the whole struct
1035          * just to clear a single value. */
1036         tconn->net_conf->discard_my_data = 0;
1037         mutex_unlock(&tconn->conf_update);
1038
1039         return h;
1040
1041 out_release_sockets:
1042         if (sock.socket)
1043                 sock_release(sock.socket);
1044         if (msock.socket)
1045                 sock_release(msock.socket);
1046         return -1;
1047 }
1048
1049 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1050 {
1051         unsigned int header_size = drbd_header_size(tconn);
1052
1053         if (header_size == sizeof(struct p_header100) &&
1054             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1055                 struct p_header100 *h = header;
1056                 if (h->pad != 0) {
1057                         conn_err(tconn, "Header padding is not zero\n");
1058                         return -EINVAL;
1059                 }
1060                 pi->vnr = be16_to_cpu(h->volume);
1061                 pi->cmd = be16_to_cpu(h->command);
1062                 pi->size = be32_to_cpu(h->length);
1063         } else if (header_size == sizeof(struct p_header95) &&
1064                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1065                 struct p_header95 *h = header;
1066                 pi->cmd = be16_to_cpu(h->command);
1067                 pi->size = be32_to_cpu(h->length);
1068                 pi->vnr = 0;
1069         } else if (header_size == sizeof(struct p_header80) &&
1070                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1071                 struct p_header80 *h = header;
1072                 pi->cmd = be16_to_cpu(h->command);
1073                 pi->size = be16_to_cpu(h->length);
1074                 pi->vnr = 0;
1075         } else {
1076                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1077                          be32_to_cpu(*(__be32 *)header),
1078                          tconn->agreed_pro_version);
1079                 return -EINVAL;
1080         }
1081         pi->data = header + header_size;
1082         return 0;
1083 }
1084
1085 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1086 {
1087         void *buffer = tconn->data.rbuf;
1088         int err;
1089
1090         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1091         if (err)
1092                 return err;
1093
1094         err = decode_header(tconn, buffer, pi);
1095         tconn->last_received = jiffies;
1096
1097         return err;
1098 }
1099
1100 static void drbd_flush(struct drbd_tconn *tconn)
1101 {
1102         int rv;
1103         struct drbd_conf *mdev;
1104         int vnr;
1105
1106         if (tconn->write_ordering >= WO_bdev_flush) {
1107                 rcu_read_lock();
1108                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1109                         if (!get_ldev(mdev))
1110                                 continue;
1111                         kref_get(&mdev->kref);
1112                         rcu_read_unlock();
1113
1114                         rv = blkdev_issue_flush(mdev->ldev->backing_bdev,
1115                                         GFP_NOIO, NULL);
1116                         if (rv) {
1117                                 dev_info(DEV, "local disk flush failed with status %d\n", rv);
1118                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1119                                  * don't try again for ANY return value != 0
1120                                  * if (rv == -EOPNOTSUPP) */
1121                                 drbd_bump_write_ordering(tconn, WO_drain_io);
1122                         }
1123                         put_ldev(mdev);
1124                         kref_put(&mdev->kref, &drbd_minor_destroy);
1125
1126                         rcu_read_lock();
1127                         if (rv)
1128                                 break;
1129                 }
1130                 rcu_read_unlock();
1131         }
1132 }
1133
1134 /**
1135  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1136  * @mdev:       DRBD device.
1137  * @epoch:      Epoch object.
1138  * @ev:         Epoch event.
1139  */
1140 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1141                                                struct drbd_epoch *epoch,
1142                                                enum epoch_event ev)
1143 {
1144         int epoch_size;
1145         struct drbd_epoch *next_epoch;
1146         enum finish_epoch rv = FE_STILL_LIVE;
1147
1148         spin_lock(&tconn->epoch_lock);
1149         do {
1150                 next_epoch = NULL;
1151
1152                 epoch_size = atomic_read(&epoch->epoch_size);
1153
1154                 switch (ev & ~EV_CLEANUP) {
1155                 case EV_PUT:
1156                         atomic_dec(&epoch->active);
1157                         break;
1158                 case EV_GOT_BARRIER_NR:
1159                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1160                         break;
1161                 case EV_BECAME_LAST:
1162                         /* nothing to do*/
1163                         break;
1164                 }
1165
1166                 if (epoch_size != 0 &&
1167                     atomic_read(&epoch->active) == 0 &&
1168                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1169                         if (!(ev & EV_CLEANUP)) {
1170                                 spin_unlock(&tconn->epoch_lock);
1171                                 drbd_send_b_ack(epoch->mdev, epoch->barrier_nr, epoch_size);
1172                                 spin_lock(&tconn->epoch_lock);
1173                         }
1174                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1175                                 dec_unacked(epoch->mdev);
1176
1177                         if (tconn->current_epoch != epoch) {
1178                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1179                                 list_del(&epoch->list);
1180                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1181                                 tconn->epochs--;
1182                                 kfree(epoch);
1183
1184                                 if (rv == FE_STILL_LIVE)
1185                                         rv = FE_DESTROYED;
1186                         } else {
1187                                 epoch->flags = 0;
1188                                 atomic_set(&epoch->epoch_size, 0);
1189                                 /* atomic_set(&epoch->active, 0); is already zero */
1190                                 if (rv == FE_STILL_LIVE)
1191                                         rv = FE_RECYCLED;
1192                         }
1193                 }
1194
1195                 if (!next_epoch)
1196                         break;
1197
1198                 epoch = next_epoch;
1199         } while (1);
1200
1201         spin_unlock(&tconn->epoch_lock);
1202
1203         return rv;
1204 }
1205
1206 /**
1207  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1208  * @tconn:      DRBD connection.
1209  * @wo:         Write ordering method to try.
1210  */
1211 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1212 {
1213         struct disk_conf *dc;
1214         struct drbd_conf *mdev;
1215         enum write_ordering_e pwo;
1216         int vnr;
1217         static char *write_ordering_str[] = {
1218                 [WO_none] = "none",
1219                 [WO_drain_io] = "drain",
1220                 [WO_bdev_flush] = "flush",
1221         };
1222
1223         pwo = tconn->write_ordering;
1224         wo = min(pwo, wo);
1225         rcu_read_lock();
1226         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1227                 if (!get_ldev(mdev))
1228                         continue;
1229                 dc = rcu_dereference(mdev->ldev->disk_conf);
1230
1231                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1232                         wo = WO_drain_io;
1233                 if (wo == WO_drain_io && !dc->disk_drain)
1234                         wo = WO_none;
1235                 put_ldev(mdev);
1236         }
1237         rcu_read_unlock();
1238         tconn->write_ordering = wo;
1239         if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1240                 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1241 }
1242
1243 /**
1244  * drbd_submit_peer_request()
1245  * @mdev:       DRBD device.
1246  * @peer_req:   peer request
1247  * @rw:         flag field, see bio->bi_rw
1248  *
1249  * May spread the pages to multiple bios,
1250  * depending on bio_add_page restrictions.
1251  *
1252  * Returns 0 if all bios have been submitted,
1253  * -ENOMEM if we could not allocate enough bios,
1254  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1255  *  single page to an empty bio (which should never happen and likely indicates
1256  *  that the lower level IO stack is in some way broken). This has been observed
1257  *  on certain Xen deployments.
1258  */
1259 /* TODO allocate from our own bio_set. */
1260 int drbd_submit_peer_request(struct drbd_conf *mdev,
1261                              struct drbd_peer_request *peer_req,
1262                              const unsigned rw, const int fault_type)
1263 {
1264         struct bio *bios = NULL;
1265         struct bio *bio;
1266         struct page *page = peer_req->pages;
1267         sector_t sector = peer_req->i.sector;
1268         unsigned ds = peer_req->i.size;
1269         unsigned n_bios = 0;
1270         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1271         int err = -ENOMEM;
1272
1273         /* In most cases, we will only need one bio.  But in case the lower
1274          * level restrictions happen to be different at this offset on this
1275          * side than those of the sending peer, we may need to submit the
1276          * request in more than one bio.
1277          *
1278          * Plain bio_alloc is good enough here, this is no DRBD internally
1279          * generated bio, but a bio allocated on behalf of the peer.
1280          */
1281 next_bio:
1282         bio = bio_alloc(GFP_NOIO, nr_pages);
1283         if (!bio) {
1284                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1285                 goto fail;
1286         }
1287         /* > peer_req->i.sector, unless this is the first bio */
1288         bio->bi_sector = sector;
1289         bio->bi_bdev = mdev->ldev->backing_bdev;
1290         bio->bi_rw = rw;
1291         bio->bi_private = peer_req;
1292         bio->bi_end_io = drbd_peer_request_endio;
1293
1294         bio->bi_next = bios;
1295         bios = bio;
1296         ++n_bios;
1297
1298         page_chain_for_each(page) {
1299                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1300                 if (!bio_add_page(bio, page, len, 0)) {
1301                         /* A single page must always be possible!
1302                          * But in case it fails anyways,
1303                          * we deal with it, and complain (below). */
1304                         if (bio->bi_vcnt == 0) {
1305                                 dev_err(DEV,
1306                                         "bio_add_page failed for len=%u, "
1307                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1308                                         len, (unsigned long long)bio->bi_sector);
1309                                 err = -ENOSPC;
1310                                 goto fail;
1311                         }
1312                         goto next_bio;
1313                 }
1314                 ds -= len;
1315                 sector += len >> 9;
1316                 --nr_pages;
1317         }
1318         D_ASSERT(page == NULL);
1319         D_ASSERT(ds == 0);
1320
1321         atomic_set(&peer_req->pending_bios, n_bios);
1322         do {
1323                 bio = bios;
1324                 bios = bios->bi_next;
1325                 bio->bi_next = NULL;
1326
1327                 drbd_generic_make_request(mdev, fault_type, bio);
1328         } while (bios);
1329         return 0;
1330
1331 fail:
1332         while (bios) {
1333                 bio = bios;
1334                 bios = bios->bi_next;
1335                 bio_put(bio);
1336         }
1337         return err;
1338 }
1339
1340 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1341                                              struct drbd_peer_request *peer_req)
1342 {
1343         struct drbd_interval *i = &peer_req->i;
1344
1345         drbd_remove_interval(&mdev->write_requests, i);
1346         drbd_clear_interval(i);
1347
1348         /* Wake up any processes waiting for this peer request to complete.  */
1349         if (i->waiting)
1350                 wake_up(&mdev->misc_wait);
1351 }
1352
1353 void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1354 {
1355         struct drbd_conf *mdev;
1356         int vnr;
1357
1358         rcu_read_lock();
1359         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1360                 kref_get(&mdev->kref);
1361                 rcu_read_unlock();
1362                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1363                 kref_put(&mdev->kref, &drbd_minor_destroy);
1364                 rcu_read_lock();
1365         }
1366         rcu_read_unlock();
1367 }
1368
1369 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1370 {
1371         struct drbd_conf *mdev;
1372         int rv;
1373         struct p_barrier *p = pi->data;
1374         struct drbd_epoch *epoch;
1375
1376         mdev = vnr_to_mdev(tconn, pi->vnr);
1377         if (!mdev)
1378                 return -EIO;
1379
1380         inc_unacked(mdev);
1381
1382         tconn->current_epoch->barrier_nr = p->barrier;
1383         tconn->current_epoch->mdev = mdev;
1384         rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1385
1386         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1387          * the activity log, which means it would not be resynced in case the
1388          * R_PRIMARY crashes now.
1389          * Therefore we must send the barrier_ack after the barrier request was
1390          * completed. */
1391         switch (tconn->write_ordering) {
1392         case WO_none:
1393                 if (rv == FE_RECYCLED)
1394                         return 0;
1395
1396                 /* receiver context, in the writeout path of the other node.
1397                  * avoid potential distributed deadlock */
1398                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1399                 if (epoch)
1400                         break;
1401                 else
1402                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1403                         /* Fall through */
1404
1405         case WO_bdev_flush:
1406         case WO_drain_io:
1407                 conn_wait_active_ee_empty(tconn);
1408                 drbd_flush(tconn);
1409
1410                 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1411                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1412                         if (epoch)
1413                                 break;
1414                 }
1415
1416                 epoch = tconn->current_epoch;
1417                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1418
1419                 D_ASSERT(atomic_read(&epoch->active) == 0);
1420                 D_ASSERT(epoch->flags == 0);
1421
1422                 return 0;
1423         default:
1424                 dev_err(DEV, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1425                 return -EIO;
1426         }
1427
1428         epoch->flags = 0;
1429         atomic_set(&epoch->epoch_size, 0);
1430         atomic_set(&epoch->active, 0);
1431
1432         spin_lock(&tconn->epoch_lock);
1433         if (atomic_read(&tconn->current_epoch->epoch_size)) {
1434                 list_add(&epoch->list, &tconn->current_epoch->list);
1435                 tconn->current_epoch = epoch;
1436                 tconn->epochs++;
1437         } else {
1438                 /* The current_epoch got recycled while we allocated this one... */
1439                 kfree(epoch);
1440         }
1441         spin_unlock(&tconn->epoch_lock);
1442
1443         return 0;
1444 }
1445
1446 /* used from receive_RSDataReply (recv_resync_read)
1447  * and from receive_Data */
1448 static struct drbd_peer_request *
1449 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1450               int data_size) __must_hold(local)
1451 {
1452         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1453         struct drbd_peer_request *peer_req;
1454         struct page *page;
1455         int dgs, ds, err;
1456         void *dig_in = mdev->tconn->int_dig_in;
1457         void *dig_vv = mdev->tconn->int_dig_vv;
1458         unsigned long *data;
1459
1460         dgs = 0;
1461         if (mdev->tconn->peer_integrity_tfm) {
1462                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1463                 /*
1464                  * FIXME: Receive the incoming digest into the receive buffer
1465                  *        here, together with its struct p_data?
1466                  */
1467                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1468                 if (err)
1469                         return NULL;
1470                 data_size -= dgs;
1471         }
1472
1473         if (!expect(data_size != 0))
1474                 return NULL;
1475         if (!expect(IS_ALIGNED(data_size, 512)))
1476                 return NULL;
1477         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1478                 return NULL;
1479
1480         /* even though we trust out peer,
1481          * we sometimes have to double check. */
1482         if (sector + (data_size>>9) > capacity) {
1483                 dev_err(DEV, "request from peer beyond end of local disk: "
1484                         "capacity: %llus < sector: %llus + size: %u\n",
1485                         (unsigned long long)capacity,
1486                         (unsigned long long)sector, data_size);
1487                 return NULL;
1488         }
1489
1490         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1491          * "criss-cross" setup, that might cause write-out on some other DRBD,
1492          * which in turn might block on the other node at this very place.  */
1493         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1494         if (!peer_req)
1495                 return NULL;
1496
1497         ds = data_size;
1498         page = peer_req->pages;
1499         page_chain_for_each(page) {
1500                 unsigned len = min_t(int, ds, PAGE_SIZE);
1501                 data = kmap(page);
1502                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1503                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1504                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1505                         data[0] = data[0] ^ (unsigned long)-1;
1506                 }
1507                 kunmap(page);
1508                 if (err) {
1509                         drbd_free_peer_req(mdev, peer_req);
1510                         return NULL;
1511                 }
1512                 ds -= len;
1513         }
1514
1515         if (dgs) {
1516                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1517                 if (memcmp(dig_in, dig_vv, dgs)) {
1518                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1519                                 (unsigned long long)sector, data_size);
1520                         drbd_free_peer_req(mdev, peer_req);
1521                         return NULL;
1522                 }
1523         }
1524         mdev->recv_cnt += data_size>>9;
1525         return peer_req;
1526 }
1527
1528 /* drbd_drain_block() just takes a data block
1529  * out of the socket input buffer, and discards it.
1530  */
1531 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1532 {
1533         struct page *page;
1534         int err = 0;
1535         void *data;
1536
1537         if (!data_size)
1538                 return 0;
1539
1540         page = drbd_alloc_pages(mdev, 1, 1);
1541
1542         data = kmap(page);
1543         while (data_size) {
1544                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1545
1546                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1547                 if (err)
1548                         break;
1549                 data_size -= len;
1550         }
1551         kunmap(page);
1552         drbd_free_pages(mdev, page, 0);
1553         return err;
1554 }
1555
1556 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1557                            sector_t sector, int data_size)
1558 {
1559         struct bio_vec *bvec;
1560         struct bio *bio;
1561         int dgs, err, i, expect;
1562         void *dig_in = mdev->tconn->int_dig_in;
1563         void *dig_vv = mdev->tconn->int_dig_vv;
1564
1565         dgs = 0;
1566         if (mdev->tconn->peer_integrity_tfm) {
1567                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1568                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1569                 if (err)
1570                         return err;
1571                 data_size -= dgs;
1572         }
1573
1574         /* optimistically update recv_cnt.  if receiving fails below,
1575          * we disconnect anyways, and counters will be reset. */
1576         mdev->recv_cnt += data_size>>9;
1577
1578         bio = req->master_bio;
1579         D_ASSERT(sector == bio->bi_sector);
1580
1581         bio_for_each_segment(bvec, bio, i) {
1582                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1583                 expect = min_t(int, data_size, bvec->bv_len);
1584                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1585                 kunmap(bvec->bv_page);
1586                 if (err)
1587                         return err;
1588                 data_size -= expect;
1589         }
1590
1591         if (dgs) {
1592                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1593                 if (memcmp(dig_in, dig_vv, dgs)) {
1594                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1595                         return -EINVAL;
1596                 }
1597         }
1598
1599         D_ASSERT(data_size == 0);
1600         return 0;
1601 }
1602
1603 /*
1604  * e_end_resync_block() is called in asender context via
1605  * drbd_finish_peer_reqs().
1606  */
1607 static int e_end_resync_block(struct drbd_work *w, int unused)
1608 {
1609         struct drbd_peer_request *peer_req =
1610                 container_of(w, struct drbd_peer_request, w);
1611         struct drbd_conf *mdev = w->mdev;
1612         sector_t sector = peer_req->i.sector;
1613         int err;
1614
1615         D_ASSERT(drbd_interval_empty(&peer_req->i));
1616
1617         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1618                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1619                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1620         } else {
1621                 /* Record failure to sync */
1622                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1623
1624                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1625         }
1626         dec_unacked(mdev);
1627
1628         return err;
1629 }
1630
1631 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1632 {
1633         struct drbd_peer_request *peer_req;
1634
1635         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1636         if (!peer_req)
1637                 goto fail;
1638
1639         dec_rs_pending(mdev);
1640
1641         inc_unacked(mdev);
1642         /* corresponding dec_unacked() in e_end_resync_block()
1643          * respective _drbd_clear_done_ee */
1644
1645         peer_req->w.cb = e_end_resync_block;
1646
1647         spin_lock_irq(&mdev->tconn->req_lock);
1648         list_add(&peer_req->w.list, &mdev->sync_ee);
1649         spin_unlock_irq(&mdev->tconn->req_lock);
1650
1651         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1652         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1653                 return 0;
1654
1655         /* don't care for the reason here */
1656         dev_err(DEV, "submit failed, triggering re-connect\n");
1657         spin_lock_irq(&mdev->tconn->req_lock);
1658         list_del(&peer_req->w.list);
1659         spin_unlock_irq(&mdev->tconn->req_lock);
1660
1661         drbd_free_peer_req(mdev, peer_req);
1662 fail:
1663         put_ldev(mdev);
1664         return -EIO;
1665 }
1666
1667 static struct drbd_request *
1668 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1669              sector_t sector, bool missing_ok, const char *func)
1670 {
1671         struct drbd_request *req;
1672
1673         /* Request object according to our peer */
1674         req = (struct drbd_request *)(unsigned long)id;
1675         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1676                 return req;
1677         if (!missing_ok) {
1678                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1679                         (unsigned long)id, (unsigned long long)sector);
1680         }
1681         return NULL;
1682 }
1683
1684 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1685 {
1686         struct drbd_conf *mdev;
1687         struct drbd_request *req;
1688         sector_t sector;
1689         int err;
1690         struct p_data *p = pi->data;
1691
1692         mdev = vnr_to_mdev(tconn, pi->vnr);
1693         if (!mdev)
1694                 return -EIO;
1695
1696         sector = be64_to_cpu(p->sector);
1697
1698         spin_lock_irq(&mdev->tconn->req_lock);
1699         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1700         spin_unlock_irq(&mdev->tconn->req_lock);
1701         if (unlikely(!req))
1702                 return -EIO;
1703
1704         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1705          * special casing it there for the various failure cases.
1706          * still no race with drbd_fail_pending_reads */
1707         err = recv_dless_read(mdev, req, sector, pi->size);
1708         if (!err)
1709                 req_mod(req, DATA_RECEIVED);
1710         /* else: nothing. handled from drbd_disconnect...
1711          * I don't think we may complete this just yet
1712          * in case we are "on-disconnect: freeze" */
1713
1714         return err;
1715 }
1716
1717 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1718 {
1719         struct drbd_conf *mdev;
1720         sector_t sector;
1721         int err;
1722         struct p_data *p = pi->data;
1723
1724         mdev = vnr_to_mdev(tconn, pi->vnr);
1725         if (!mdev)
1726                 return -EIO;
1727
1728         sector = be64_to_cpu(p->sector);
1729         D_ASSERT(p->block_id == ID_SYNCER);
1730
1731         if (get_ldev(mdev)) {
1732                 /* data is submitted to disk within recv_resync_read.
1733                  * corresponding put_ldev done below on error,
1734                  * or in drbd_peer_request_endio. */
1735                 err = recv_resync_read(mdev, sector, pi->size);
1736         } else {
1737                 if (__ratelimit(&drbd_ratelimit_state))
1738                         dev_err(DEV, "Can not write resync data to local disk.\n");
1739
1740                 err = drbd_drain_block(mdev, pi->size);
1741
1742                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1743         }
1744
1745         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1746
1747         return err;
1748 }
1749
1750 static int w_restart_write(struct drbd_work *w, int cancel)
1751 {
1752         struct drbd_request *req = container_of(w, struct drbd_request, w);
1753         struct drbd_conf *mdev = w->mdev;
1754         struct bio *bio;
1755         unsigned long start_time;
1756         unsigned long flags;
1757
1758         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1759         if (!expect(req->rq_state & RQ_POSTPONED)) {
1760                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1761                 return -EIO;
1762         }
1763         bio = req->master_bio;
1764         start_time = req->start_time;
1765         /* Postponed requests will not have their master_bio completed!  */
1766         __req_mod(req, DISCARD_WRITE, NULL);
1767         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1768
1769         while (__drbd_make_request(mdev, bio, start_time))
1770                 /* retry */ ;
1771         return 0;
1772 }
1773
1774 static void restart_conflicting_writes(struct drbd_conf *mdev,
1775                                        sector_t sector, int size)
1776 {
1777         struct drbd_interval *i;
1778         struct drbd_request *req;
1779
1780         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1781                 if (!i->local)
1782                         continue;
1783                 req = container_of(i, struct drbd_request, i);
1784                 if (req->rq_state & RQ_LOCAL_PENDING ||
1785                     !(req->rq_state & RQ_POSTPONED))
1786                         continue;
1787                 if (expect(list_empty(&req->w.list))) {
1788                         req->w.mdev = mdev;
1789                         req->w.cb = w_restart_write;
1790                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1791                 }
1792         }
1793 }
1794
1795 /*
1796  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1797  */
1798 static int e_end_block(struct drbd_work *w, int cancel)
1799 {
1800         struct drbd_peer_request *peer_req =
1801                 container_of(w, struct drbd_peer_request, w);
1802         struct drbd_conf *mdev = w->mdev;
1803         sector_t sector = peer_req->i.sector;
1804         int err = 0, pcmd;
1805
1806         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1807                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1808                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1809                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1810                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1811                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1812                         err = drbd_send_ack(mdev, pcmd, peer_req);
1813                         if (pcmd == P_RS_WRITE_ACK)
1814                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1815                 } else {
1816                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1817                         /* we expect it to be marked out of sync anyways...
1818                          * maybe assert this?  */
1819                 }
1820                 dec_unacked(mdev);
1821         }
1822         /* we delete from the conflict detection hash _after_ we sent out the
1823          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1824         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1825                 spin_lock_irq(&mdev->tconn->req_lock);
1826                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1827                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1828                 if (peer_req->flags & EE_RESTART_REQUESTS)
1829                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1830                 spin_unlock_irq(&mdev->tconn->req_lock);
1831         } else
1832                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1833
1834         drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1835
1836         return err;
1837 }
1838
1839 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1840 {
1841         struct drbd_conf *mdev = w->mdev;
1842         struct drbd_peer_request *peer_req =
1843                 container_of(w, struct drbd_peer_request, w);
1844         int err;
1845
1846         err = drbd_send_ack(mdev, ack, peer_req);
1847         dec_unacked(mdev);
1848
1849         return err;
1850 }
1851
1852 static int e_send_discard_write(struct drbd_work *w, int unused)
1853 {
1854         return e_send_ack(w, P_DISCARD_WRITE);
1855 }
1856
1857 static int e_send_retry_write(struct drbd_work *w, int unused)
1858 {
1859         struct drbd_tconn *tconn = w->mdev->tconn;
1860
1861         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1862                              P_RETRY_WRITE : P_DISCARD_WRITE);
1863 }
1864
1865 static bool seq_greater(u32 a, u32 b)
1866 {
1867         /*
1868          * We assume 32-bit wrap-around here.
1869          * For 24-bit wrap-around, we would have to shift:
1870          *  a <<= 8; b <<= 8;
1871          */
1872         return (s32)a - (s32)b > 0;
1873 }
1874
1875 static u32 seq_max(u32 a, u32 b)
1876 {
1877         return seq_greater(a, b) ? a : b;
1878 }
1879
1880 static bool need_peer_seq(struct drbd_conf *mdev)
1881 {
1882         struct drbd_tconn *tconn = mdev->tconn;
1883         int tp;
1884
1885         /*
1886          * We only need to keep track of the last packet_seq number of our peer
1887          * if we are in dual-primary mode and we have the discard flag set; see
1888          * handle_write_conflicts().
1889          */
1890
1891         rcu_read_lock();
1892         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1893         rcu_read_unlock();
1894
1895         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1896 }
1897
1898 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1899 {
1900         unsigned int newest_peer_seq;
1901
1902         if (need_peer_seq(mdev)) {
1903                 spin_lock(&mdev->peer_seq_lock);
1904                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1905                 mdev->peer_seq = newest_peer_seq;
1906                 spin_unlock(&mdev->peer_seq_lock);
1907                 /* wake up only if we actually changed mdev->peer_seq */
1908                 if (peer_seq == newest_peer_seq)
1909                         wake_up(&mdev->seq_wait);
1910         }
1911 }
1912
1913 /* Called from receive_Data.
1914  * Synchronize packets on sock with packets on msock.
1915  *
1916  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1917  * packet traveling on msock, they are still processed in the order they have
1918  * been sent.
1919  *
1920  * Note: we don't care for Ack packets overtaking P_DATA packets.
1921  *
1922  * In case packet_seq is larger than mdev->peer_seq number, there are
1923  * outstanding packets on the msock. We wait for them to arrive.
1924  * In case we are the logically next packet, we update mdev->peer_seq
1925  * ourselves. Correctly handles 32bit wrap around.
1926  *
1927  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1928  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1929  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1930  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1931  *
1932  * returns 0 if we may process the packet,
1933  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1934 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1935 {
1936         DEFINE_WAIT(wait);
1937         long timeout;
1938         int ret;
1939
1940         if (!need_peer_seq(mdev))
1941                 return 0;
1942
1943         spin_lock(&mdev->peer_seq_lock);
1944         for (;;) {
1945                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1946                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1947                         ret = 0;
1948                         break;
1949                 }
1950                 if (signal_pending(current)) {
1951                         ret = -ERESTARTSYS;
1952                         break;
1953                 }
1954                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1955                 spin_unlock(&mdev->peer_seq_lock);
1956                 rcu_read_lock();
1957                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1958                 rcu_read_unlock();
1959                 timeout = schedule_timeout(timeout);
1960                 spin_lock(&mdev->peer_seq_lock);
1961                 if (!timeout) {
1962                         ret = -ETIMEDOUT;
1963                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1964                         break;
1965                 }
1966         }
1967         spin_unlock(&mdev->peer_seq_lock);
1968         finish_wait(&mdev->seq_wait, &wait);
1969         return ret;
1970 }
1971
1972 /* see also bio_flags_to_wire()
1973  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1974  * flags and back. We may replicate to other kernel versions. */
1975 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1976 {
1977         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1978                 (dpf & DP_FUA ? REQ_FUA : 0) |
1979                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1980                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1981 }
1982
1983 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1984                                     unsigned int size)
1985 {
1986         struct drbd_interval *i;
1987
1988     repeat:
1989         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1990                 struct drbd_request *req;
1991                 struct bio_and_error m;
1992
1993                 if (!i->local)
1994                         continue;
1995                 req = container_of(i, struct drbd_request, i);
1996                 if (!(req->rq_state & RQ_POSTPONED))
1997                         continue;
1998                 req->rq_state &= ~RQ_POSTPONED;
1999                 __req_mod(req, NEG_ACKED, &m);
2000                 spin_unlock_irq(&mdev->tconn->req_lock);
2001                 if (m.bio)
2002                         complete_master_bio(mdev, &m);
2003                 spin_lock_irq(&mdev->tconn->req_lock);
2004                 goto repeat;
2005         }
2006 }
2007
2008 static int handle_write_conflicts(struct drbd_conf *mdev,
2009                                   struct drbd_peer_request *peer_req)
2010 {
2011         struct drbd_tconn *tconn = mdev->tconn;
2012         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
2013         sector_t sector = peer_req->i.sector;
2014         const unsigned int size = peer_req->i.size;
2015         struct drbd_interval *i;
2016         bool equal;
2017         int err;
2018
2019         /*
2020          * Inserting the peer request into the write_requests tree will prevent
2021          * new conflicting local requests from being added.
2022          */
2023         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
2024
2025     repeat:
2026         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2027                 if (i == &peer_req->i)
2028                         continue;
2029
2030                 if (!i->local) {
2031                         /*
2032                          * Our peer has sent a conflicting remote request; this
2033                          * should not happen in a two-node setup.  Wait for the
2034                          * earlier peer request to complete.
2035                          */
2036                         err = drbd_wait_misc(mdev, i);
2037                         if (err)
2038                                 goto out;
2039                         goto repeat;
2040                 }
2041
2042                 equal = i->sector == sector && i->size == size;
2043                 if (resolve_conflicts) {
2044                         /*
2045                          * If the peer request is fully contained within the
2046                          * overlapping request, it can be discarded; otherwise,
2047                          * it will be retried once all overlapping requests
2048                          * have completed.
2049                          */
2050                         bool discard = i->sector <= sector && i->sector +
2051                                        (i->size >> 9) >= sector + (size >> 9);
2052
2053                         if (!equal)
2054                                 dev_alert(DEV, "Concurrent writes detected: "
2055                                                "local=%llus +%u, remote=%llus +%u, "
2056                                                "assuming %s came first\n",
2057                                           (unsigned long long)i->sector, i->size,
2058                                           (unsigned long long)sector, size,
2059                                           discard ? "local" : "remote");
2060
2061                         inc_unacked(mdev);
2062                         peer_req->w.cb = discard ? e_send_discard_write :
2063                                                    e_send_retry_write;
2064                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2065                         wake_asender(mdev->tconn);
2066
2067                         err = -ENOENT;
2068                         goto out;
2069                 } else {
2070                         struct drbd_request *req =
2071                                 container_of(i, struct drbd_request, i);
2072
2073                         if (!equal)
2074                                 dev_alert(DEV, "Concurrent writes detected: "
2075                                                "local=%llus +%u, remote=%llus +%u\n",
2076                                           (unsigned long long)i->sector, i->size,
2077                                           (unsigned long long)sector, size);
2078
2079                         if (req->rq_state & RQ_LOCAL_PENDING ||
2080                             !(req->rq_state & RQ_POSTPONED)) {
2081                                 /*
2082                                  * Wait for the node with the discard flag to
2083                                  * decide if this request will be discarded or
2084                                  * retried.  Requests that are discarded will
2085                                  * disappear from the write_requests tree.
2086                                  *
2087                                  * In addition, wait for the conflicting
2088                                  * request to finish locally before submitting
2089                                  * the conflicting peer request.
2090                                  */
2091                                 err = drbd_wait_misc(mdev, &req->i);
2092                                 if (err) {
2093                                         _conn_request_state(mdev->tconn,
2094                                                             NS(conn, C_TIMEOUT),
2095                                                             CS_HARD);
2096                                         fail_postponed_requests(mdev, sector, size);
2097                                         goto out;
2098                                 }
2099                                 goto repeat;
2100                         }
2101                         /*
2102                          * Remember to restart the conflicting requests after
2103                          * the new peer request has completed.
2104                          */
2105                         peer_req->flags |= EE_RESTART_REQUESTS;
2106                 }
2107         }
2108         err = 0;
2109
2110     out:
2111         if (err)
2112                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2113         return err;
2114 }
2115
2116 /* mirrored write */
2117 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2118 {
2119         struct drbd_conf *mdev;
2120         sector_t sector;
2121         struct drbd_peer_request *peer_req;
2122         struct p_data *p = pi->data;
2123         u32 peer_seq = be32_to_cpu(p->seq_num);
2124         int rw = WRITE;
2125         u32 dp_flags;
2126         int err, tp;
2127
2128         mdev = vnr_to_mdev(tconn, pi->vnr);
2129         if (!mdev)
2130                 return -EIO;
2131
2132         if (!get_ldev(mdev)) {
2133                 int err2;
2134
2135                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2136                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2137                 atomic_inc(&tconn->current_epoch->epoch_size);
2138                 err2 = drbd_drain_block(mdev, pi->size);
2139                 if (!err)
2140                         err = err2;
2141                 return err;
2142         }
2143
2144         /*
2145          * Corresponding put_ldev done either below (on various errors), or in
2146          * drbd_peer_request_endio, if we successfully submit the data at the
2147          * end of this function.
2148          */
2149
2150         sector = be64_to_cpu(p->sector);
2151         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2152         if (!peer_req) {
2153                 put_ldev(mdev);
2154                 return -EIO;
2155         }
2156
2157         peer_req->w.cb = e_end_block;
2158
2159         dp_flags = be32_to_cpu(p->dp_flags);
2160         rw |= wire_flags_to_bio(mdev, dp_flags);
2161
2162         if (dp_flags & DP_MAY_SET_IN_SYNC)
2163                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2164
2165         spin_lock(&tconn->epoch_lock);
2166         peer_req->epoch = tconn->current_epoch;
2167         atomic_inc(&peer_req->epoch->epoch_size);
2168         atomic_inc(&peer_req->epoch->active);
2169         spin_unlock(&tconn->epoch_lock);
2170
2171         rcu_read_lock();
2172         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2173         rcu_read_unlock();
2174         if (tp) {
2175                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2176                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2177                 if (err)
2178                         goto out_interrupted;
2179                 spin_lock_irq(&mdev->tconn->req_lock);
2180                 err = handle_write_conflicts(mdev, peer_req);
2181                 if (err) {
2182                         spin_unlock_irq(&mdev->tconn->req_lock);
2183                         if (err == -ENOENT) {
2184                                 put_ldev(mdev);
2185                                 return 0;
2186                         }
2187                         goto out_interrupted;
2188                 }
2189         } else
2190                 spin_lock_irq(&mdev->tconn->req_lock);
2191         list_add(&peer_req->w.list, &mdev->active_ee);
2192         spin_unlock_irq(&mdev->tconn->req_lock);
2193
2194         if (mdev->tconn->agreed_pro_version < 100) {
2195                 rcu_read_lock();
2196                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2197                 case DRBD_PROT_C:
2198                         dp_flags |= DP_SEND_WRITE_ACK;
2199                         break;
2200                 case DRBD_PROT_B:
2201                         dp_flags |= DP_SEND_RECEIVE_ACK;
2202                         break;
2203                 }
2204                 rcu_read_unlock();
2205         }
2206
2207         if (dp_flags & DP_SEND_WRITE_ACK) {
2208                 peer_req->flags |= EE_SEND_WRITE_ACK;
2209                 inc_unacked(mdev);
2210                 /* corresponding dec_unacked() in e_end_block()
2211                  * respective _drbd_clear_done_ee */
2212         }
2213
2214         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2215                 /* I really don't like it that the receiver thread
2216                  * sends on the msock, but anyways */
2217                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2218         }
2219
2220         if (mdev->state.pdsk < D_INCONSISTENT) {
2221                 /* In case we have the only disk of the cluster, */
2222                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2223                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2224                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2225                 drbd_al_begin_io(mdev, &peer_req->i);
2226         }
2227
2228         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2229         if (!err)
2230                 return 0;
2231
2232         /* don't care for the reason here */
2233         dev_err(DEV, "submit failed, triggering re-connect\n");
2234         spin_lock_irq(&mdev->tconn->req_lock);
2235         list_del(&peer_req->w.list);
2236         drbd_remove_epoch_entry_interval(mdev, peer_req);
2237         spin_unlock_irq(&mdev->tconn->req_lock);
2238         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2239                 drbd_al_complete_io(mdev, &peer_req->i);
2240
2241 out_interrupted:
2242         drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2243         put_ldev(mdev);
2244         drbd_free_peer_req(mdev, peer_req);
2245         return err;
2246 }
2247
2248 /* We may throttle resync, if the lower device seems to be busy,
2249  * and current sync rate is above c_min_rate.
2250  *
2251  * To decide whether or not the lower device is busy, we use a scheme similar
2252  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2253  * (more than 64 sectors) of activity we cannot account for with our own resync
2254  * activity, it obviously is "busy".
2255  *
2256  * The current sync rate used here uses only the most recent two step marks,
2257  * to have a short time average so we can react faster.
2258  */
2259 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2260 {
2261         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2262         unsigned long db, dt, dbdt;
2263         struct lc_element *tmp;
2264         int curr_events;
2265         int throttle = 0;
2266         unsigned int c_min_rate;
2267
2268         rcu_read_lock();
2269         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2270         rcu_read_unlock();
2271
2272         /* feature disabled? */
2273         if (c_min_rate == 0)
2274                 return 0;
2275
2276         spin_lock_irq(&mdev->al_lock);
2277         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2278         if (tmp) {
2279                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2280                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2281                         spin_unlock_irq(&mdev->al_lock);
2282                         return 0;
2283                 }
2284                 /* Do not slow down if app IO is already waiting for this extent */
2285         }
2286         spin_unlock_irq(&mdev->al_lock);
2287
2288         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2289                       (int)part_stat_read(&disk->part0, sectors[1]) -
2290                         atomic_read(&mdev->rs_sect_ev);
2291
2292         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2293                 unsigned long rs_left;
2294                 int i;
2295
2296                 mdev->rs_last_events = curr_events;
2297
2298                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2299                  * approx. */
2300                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2301
2302                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2303                         rs_left = mdev->ov_left;
2304                 else
2305                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2306
2307                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2308                 if (!dt)
2309                         dt++;
2310                 db = mdev->rs_mark_left[i] - rs_left;
2311                 dbdt = Bit2KB(db/dt);
2312
2313                 if (dbdt > c_min_rate)
2314                         throttle = 1;
2315         }
2316         return throttle;
2317 }
2318
2319
2320 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2321 {
2322         struct drbd_conf *mdev;
2323         sector_t sector;
2324         sector_t capacity;
2325         struct drbd_peer_request *peer_req;
2326         struct digest_info *di = NULL;
2327         int size, verb;
2328         unsigned int fault_type;
2329         struct p_block_req *p = pi->data;
2330
2331         mdev = vnr_to_mdev(tconn, pi->vnr);
2332         if (!mdev)
2333                 return -EIO;
2334         capacity = drbd_get_capacity(mdev->this_bdev);
2335
2336         sector = be64_to_cpu(p->sector);
2337         size   = be32_to_cpu(p->blksize);
2338
2339         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2340                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2341                                 (unsigned long long)sector, size);
2342                 return -EINVAL;
2343         }
2344         if (sector + (size>>9) > capacity) {
2345                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2346                                 (unsigned long long)sector, size);
2347                 return -EINVAL;
2348         }
2349
2350         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2351                 verb = 1;
2352                 switch (pi->cmd) {
2353                 case P_DATA_REQUEST:
2354                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2355                         break;
2356                 case P_RS_DATA_REQUEST:
2357                 case P_CSUM_RS_REQUEST:
2358                 case P_OV_REQUEST:
2359                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2360                         break;
2361                 case P_OV_REPLY:
2362                         verb = 0;
2363                         dec_rs_pending(mdev);
2364                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2365                         break;
2366                 default:
2367                         BUG();
2368                 }
2369                 if (verb && __ratelimit(&drbd_ratelimit_state))
2370                         dev_err(DEV, "Can not satisfy peer's read request, "
2371                             "no local data.\n");
2372
2373                 /* drain possibly payload */
2374                 return drbd_drain_block(mdev, pi->size);
2375         }
2376
2377         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2378          * "criss-cross" setup, that might cause write-out on some other DRBD,
2379          * which in turn might block on the other node at this very place.  */
2380         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2381         if (!peer_req) {
2382                 put_ldev(mdev);
2383                 return -ENOMEM;
2384         }
2385
2386         switch (pi->cmd) {
2387         case P_DATA_REQUEST:
2388                 peer_req->w.cb = w_e_end_data_req;
2389                 fault_type = DRBD_FAULT_DT_RD;
2390                 /* application IO, don't drbd_rs_begin_io */
2391                 goto submit;
2392
2393         case P_RS_DATA_REQUEST:
2394                 peer_req->w.cb = w_e_end_rsdata_req;
2395                 fault_type = DRBD_FAULT_RS_RD;
2396                 /* used in the sector offset progress display */
2397                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2398                 break;
2399
2400         case P_OV_REPLY:
2401         case P_CSUM_RS_REQUEST:
2402                 fault_type = DRBD_FAULT_RS_RD;
2403                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2404                 if (!di)
2405                         goto out_free_e;
2406
2407                 di->digest_size = pi->size;
2408                 di->digest = (((char *)di)+sizeof(struct digest_info));
2409
2410                 peer_req->digest = di;
2411                 peer_req->flags |= EE_HAS_DIGEST;
2412
2413                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2414                         goto out_free_e;
2415
2416                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2417                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2418                         peer_req->w.cb = w_e_end_csum_rs_req;
2419                         /* used in the sector offset progress display */
2420                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2421                 } else if (pi->cmd == P_OV_REPLY) {
2422                         /* track progress, we may need to throttle */
2423                         atomic_add(size >> 9, &mdev->rs_sect_in);
2424                         peer_req->w.cb = w_e_end_ov_reply;
2425                         dec_rs_pending(mdev);
2426                         /* drbd_rs_begin_io done when we sent this request,
2427                          * but accounting still needs to be done. */
2428                         goto submit_for_resync;
2429                 }
2430                 break;
2431
2432         case P_OV_REQUEST:
2433                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2434                     mdev->tconn->agreed_pro_version >= 90) {
2435                         unsigned long now = jiffies;
2436                         int i;
2437                         mdev->ov_start_sector = sector;
2438                         mdev->ov_position = sector;
2439                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2440                         mdev->rs_total = mdev->ov_left;
2441                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2442                                 mdev->rs_mark_left[i] = mdev->ov_left;
2443                                 mdev->rs_mark_time[i] = now;
2444                         }
2445                         dev_info(DEV, "Online Verify start sector: %llu\n",
2446                                         (unsigned long long)sector);
2447                 }
2448                 peer_req->w.cb = w_e_end_ov_req;
2449                 fault_type = DRBD_FAULT_RS_RD;
2450                 break;
2451
2452         default:
2453                 BUG();
2454         }
2455
2456         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2457          * wrt the receiver, but it is not as straightforward as it may seem.
2458          * Various places in the resync start and stop logic assume resync
2459          * requests are processed in order, requeuing this on the worker thread
2460          * introduces a bunch of new code for synchronization between threads.
2461          *
2462          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2463          * "forever", throttling after drbd_rs_begin_io will lock that extent
2464          * for application writes for the same time.  For now, just throttle
2465          * here, where the rest of the code expects the receiver to sleep for
2466          * a while, anyways.
2467          */
2468
2469         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2470          * this defers syncer requests for some time, before letting at least
2471          * on request through.  The resync controller on the receiving side
2472          * will adapt to the incoming rate accordingly.
2473          *
2474          * We cannot throttle here if remote is Primary/SyncTarget:
2475          * we would also throttle its application reads.
2476          * In that case, throttling is done on the SyncTarget only.
2477          */
2478         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2479                 schedule_timeout_uninterruptible(HZ/10);
2480         if (drbd_rs_begin_io(mdev, sector))
2481                 goto out_free_e;
2482
2483 submit_for_resync:
2484         atomic_add(size >> 9, &mdev->rs_sect_ev);
2485
2486 submit:
2487         inc_unacked(mdev);
2488         spin_lock_irq(&mdev->tconn->req_lock);
2489         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2490         spin_unlock_irq(&mdev->tconn->req_lock);
2491
2492         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2493                 return 0;
2494
2495         /* don't care for the reason here */
2496         dev_err(DEV, "submit failed, triggering re-connect\n");
2497         spin_lock_irq(&mdev->tconn->req_lock);
2498         list_del(&peer_req->w.list);
2499         spin_unlock_irq(&mdev->tconn->req_lock);
2500         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2501
2502 out_free_e:
2503         put_ldev(mdev);
2504         drbd_free_peer_req(mdev, peer_req);
2505         return -EIO;
2506 }
2507
2508 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2509 {
2510         int self, peer, rv = -100;
2511         unsigned long ch_self, ch_peer;
2512         enum drbd_after_sb_p after_sb_0p;
2513
2514         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2515         peer = mdev->p_uuid[UI_BITMAP] & 1;
2516
2517         ch_peer = mdev->p_uuid[UI_SIZE];
2518         ch_self = mdev->comm_bm_set;
2519
2520         rcu_read_lock();
2521         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2522         rcu_read_unlock();
2523         switch (after_sb_0p) {
2524         case ASB_CONSENSUS:
2525         case ASB_DISCARD_SECONDARY:
2526         case ASB_CALL_HELPER:
2527         case ASB_VIOLENTLY:
2528                 dev_err(DEV, "Configuration error.\n");
2529                 break;
2530         case ASB_DISCONNECT:
2531                 break;
2532         case ASB_DISCARD_YOUNGER_PRI:
2533                 if (self == 0 && peer == 1) {
2534                         rv = -1;
2535                         break;
2536                 }
2537                 if (self == 1 && peer == 0) {
2538                         rv =  1;
2539                         break;
2540                 }
2541                 /* Else fall through to one of the other strategies... */
2542         case ASB_DISCARD_OLDER_PRI:
2543                 if (self == 0 && peer == 1) {
2544                         rv = 1;
2545                         break;
2546                 }
2547                 if (self == 1 && peer == 0) {
2548                         rv = -1;
2549                         break;
2550                 }
2551                 /* Else fall through to one of the other strategies... */
2552                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2553                      "Using discard-least-changes instead\n");
2554         case ASB_DISCARD_ZERO_CHG:
2555                 if (ch_peer == 0 && ch_self == 0) {
2556                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2557                                 ? -1 : 1;
2558                         break;
2559                 } else {
2560                         if (ch_peer == 0) { rv =  1; break; }
2561                         if (ch_self == 0) { rv = -1; break; }
2562                 }
2563                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2564                         break;
2565         case ASB_DISCARD_LEAST_CHG:
2566                 if      (ch_self < ch_peer)
2567                         rv = -1;
2568                 else if (ch_self > ch_peer)
2569                         rv =  1;
2570                 else /* ( ch_self == ch_peer ) */
2571                      /* Well, then use something else. */
2572                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2573                                 ? -1 : 1;
2574                 break;
2575         case ASB_DISCARD_LOCAL:
2576                 rv = -1;
2577                 break;
2578         case ASB_DISCARD_REMOTE:
2579                 rv =  1;
2580         }
2581
2582         return rv;
2583 }
2584
2585 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2586 {
2587         int hg, rv = -100;
2588         enum drbd_after_sb_p after_sb_1p;
2589
2590         rcu_read_lock();
2591         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2592         rcu_read_unlock();
2593         switch (after_sb_1p) {
2594         case ASB_DISCARD_YOUNGER_PRI:
2595         case ASB_DISCARD_OLDER_PRI:
2596         case ASB_DISCARD_LEAST_CHG:
2597         case ASB_DISCARD_LOCAL:
2598         case ASB_DISCARD_REMOTE:
2599         case ASB_DISCARD_ZERO_CHG:
2600                 dev_err(DEV, "Configuration error.\n");
2601                 break;
2602         case ASB_DISCONNECT:
2603                 break;
2604         case ASB_CONSENSUS:
2605                 hg = drbd_asb_recover_0p(mdev);
2606                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2607                         rv = hg;
2608                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2609                         rv = hg;
2610                 break;
2611         case ASB_VIOLENTLY:
2612                 rv = drbd_asb_recover_0p(mdev);
2613                 break;
2614         case ASB_DISCARD_SECONDARY:
2615                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2616         case ASB_CALL_HELPER:
2617                 hg = drbd_asb_recover_0p(mdev);
2618                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2619                         enum drbd_state_rv rv2;
2620
2621                         drbd_set_role(mdev, R_SECONDARY, 0);
2622                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2623                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2624                           * we do not need to wait for the after state change work either. */
2625                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2626                         if (rv2 != SS_SUCCESS) {
2627                                 drbd_khelper(mdev, "pri-lost-after-sb");
2628                         } else {
2629                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2630                                 rv = hg;
2631                         }
2632                 } else
2633                         rv = hg;
2634         }
2635
2636         return rv;
2637 }
2638
2639 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2640 {
2641         int hg, rv = -100;
2642         enum drbd_after_sb_p after_sb_2p;
2643
2644         rcu_read_lock();
2645         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2646         rcu_read_unlock();
2647         switch (after_sb_2p) {
2648         case ASB_DISCARD_YOUNGER_PRI:
2649         case ASB_DISCARD_OLDER_PRI:
2650         case ASB_DISCARD_LEAST_CHG:
2651         case ASB_DISCARD_LOCAL:
2652         case ASB_DISCARD_REMOTE:
2653         case ASB_CONSENSUS:
2654         case ASB_DISCARD_SECONDARY:
2655         case ASB_DISCARD_ZERO_CHG:
2656                 dev_err(DEV, "Configuration error.\n");
2657                 break;
2658         case ASB_VIOLENTLY:
2659                 rv = drbd_asb_recover_0p(mdev);
2660                 break;
2661         case ASB_DISCONNECT:
2662                 break;
2663         case ASB_CALL_HELPER:
2664                 hg = drbd_asb_recover_0p(mdev);
2665                 if (hg == -1) {
2666                         enum drbd_state_rv rv2;
2667
2668                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2669                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2670                           * we do not need to wait for the after state change work either. */
2671                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2672                         if (rv2 != SS_SUCCESS) {
2673                                 drbd_khelper(mdev, "pri-lost-after-sb");
2674                         } else {
2675                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2676                                 rv = hg;
2677                         }
2678                 } else
2679                         rv = hg;
2680         }
2681
2682         return rv;
2683 }
2684
2685 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2686                            u64 bits, u64 flags)
2687 {
2688         if (!uuid) {
2689                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2690                 return;
2691         }
2692         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2693              text,
2694              (unsigned long long)uuid[UI_CURRENT],
2695              (unsigned long long)uuid[UI_BITMAP],
2696              (unsigned long long)uuid[UI_HISTORY_START],
2697              (unsigned long long)uuid[UI_HISTORY_END],
2698              (unsigned long long)bits,
2699              (unsigned long long)flags);
2700 }
2701
2702 /*
2703   100   after split brain try auto recover
2704     2   C_SYNC_SOURCE set BitMap
2705     1   C_SYNC_SOURCE use BitMap
2706     0   no Sync
2707    -1   C_SYNC_TARGET use BitMap
2708    -2   C_SYNC_TARGET set BitMap
2709  -100   after split brain, disconnect
2710 -1000   unrelated data
2711 -1091   requires proto 91
2712 -1096   requires proto 96
2713  */
2714 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2715 {
2716         u64 self, peer;
2717         int i, j;
2718
2719         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2720         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2721
2722         *rule_nr = 10;
2723         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2724                 return 0;
2725
2726         *rule_nr = 20;
2727         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2728              peer != UUID_JUST_CREATED)
2729                 return -2;
2730
2731         *rule_nr = 30;
2732         if (self != UUID_JUST_CREATED &&
2733             (peer == UUID_JUST_CREATED || peer == (u64)0))
2734                 return 2;
2735
2736         if (self == peer) {
2737                 int rct, dc; /* roles at crash time */
2738
2739                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2740
2741                         if (mdev->tconn->agreed_pro_version < 91)
2742                                 return -1091;
2743
2744                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2745                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2746                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2747                                 drbd_uuid_set_bm(mdev, 0UL);
2748
2749                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2750                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2751                                 *rule_nr = 34;
2752                         } else {
2753                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2754                                 *rule_nr = 36;
2755                         }
2756
2757                         return 1;
2758                 }
2759
2760                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2761
2762                         if (mdev->tconn->agreed_pro_version < 91)
2763                                 return -1091;
2764
2765                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2766                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2767                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2768
2769                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2770                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2771                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2772
2773                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2774                                 *rule_nr = 35;
2775                         } else {
2776                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2777                                 *rule_nr = 37;
2778                         }
2779
2780                         return -1;
2781                 }
2782
2783                 /* Common power [off|failure] */
2784                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2785                         (mdev->p_uuid[UI_FLAGS] & 2);
2786                 /* lowest bit is set when we were primary,
2787                  * next bit (weight 2) is set when peer was primary */
2788                 *rule_nr = 40;
2789
2790                 switch (rct) {
2791                 case 0: /* !self_pri && !peer_pri */ return 0;
2792                 case 1: /*  self_pri && !peer_pri */ return 1;
2793                 case 2: /* !self_pri &&  peer_pri */ return -1;
2794                 case 3: /*  self_pri &&  peer_pri */
2795                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2796                         return dc ? -1 : 1;
2797                 }
2798         }
2799
2800         *rule_nr = 50;
2801         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2802         if (self == peer)
2803                 return -1;
2804
2805         *rule_nr = 51;
2806         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2807         if (self == peer) {
2808                 if (mdev->tconn->agreed_pro_version < 96 ?
2809                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2810                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2811                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2812                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2813                            resync as sync source modifications of the peer's UUIDs. */
2814
2815                         if (mdev->tconn->agreed_pro_version < 91)
2816                                 return -1091;
2817
2818                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2819                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2820
2821                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2822                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2823
2824                         return -1;
2825                 }
2826         }
2827
2828         *rule_nr = 60;
2829         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2830         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2831                 peer = mdev->p_uuid[i] & ~((u64)1);
2832                 if (self == peer)
2833                         return -2;
2834         }
2835
2836         *rule_nr = 70;
2837         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2838         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2839         if (self == peer)
2840                 return 1;
2841
2842         *rule_nr = 71;
2843         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2844         if (self == peer) {
2845                 if (mdev->tconn->agreed_pro_version < 96 ?
2846                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2847                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2848                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2849                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2850                            resync as sync source modifications of our UUIDs. */
2851
2852                         if (mdev->tconn->agreed_pro_version < 91)
2853                                 return -1091;
2854
2855                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2856                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2857
2858                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2859                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2860                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2861
2862                         return 1;
2863                 }
2864         }
2865
2866
2867         *rule_nr = 80;
2868         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2869         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2870                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2871                 if (self == peer)
2872                         return 2;
2873         }
2874
2875         *rule_nr = 90;
2876         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2877         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2878         if (self == peer && self != ((u64)0))
2879                 return 100;
2880
2881         *rule_nr = 100;
2882         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2883                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2884                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2885                         peer = mdev->p_uuid[j] & ~((u64)1);
2886                         if (self == peer)
2887                                 return -100;
2888                 }
2889         }
2890
2891         return -1000;
2892 }
2893
2894 /* drbd_sync_handshake() returns the new conn state on success, or
2895    CONN_MASK (-1) on failure.
2896  */
2897 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2898                                            enum drbd_disk_state peer_disk) __must_hold(local)
2899 {
2900         enum drbd_conns rv = C_MASK;
2901         enum drbd_disk_state mydisk;
2902         struct net_conf *nc;
2903         int hg, rule_nr, rr_conflict, tentative;
2904
2905         mydisk = mdev->state.disk;
2906         if (mydisk == D_NEGOTIATING)
2907                 mydisk = mdev->new_state_tmp.disk;
2908
2909         dev_info(DEV, "drbd_sync_handshake:\n");
2910         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2911         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2912                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2913
2914         hg = drbd_uuid_compare(mdev, &rule_nr);
2915
2916         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2917
2918         if (hg == -1000) {
2919                 dev_alert(DEV, "Unrelated data, aborting!\n");
2920                 return C_MASK;
2921         }
2922         if (hg < -1000) {
2923                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2924                 return C_MASK;
2925         }
2926
2927         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2928             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2929                 int f = (hg == -100) || abs(hg) == 2;
2930                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2931                 if (f)
2932                         hg = hg*2;
2933                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2934                      hg > 0 ? "source" : "target");
2935         }
2936
2937         if (abs(hg) == 100)
2938                 drbd_khelper(mdev, "initial-split-brain");
2939
2940         rcu_read_lock();
2941         nc = rcu_dereference(mdev->tconn->net_conf);
2942
2943         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2944                 int pcount = (mdev->state.role == R_PRIMARY)
2945                            + (peer_role == R_PRIMARY);
2946                 int forced = (hg == -100);
2947
2948                 switch (pcount) {
2949                 case 0:
2950                         hg = drbd_asb_recover_0p(mdev);
2951                         break;
2952                 case 1:
2953                         hg = drbd_asb_recover_1p(mdev);
2954                         break;
2955                 case 2:
2956                         hg = drbd_asb_recover_2p(mdev);
2957                         break;
2958                 }
2959                 if (abs(hg) < 100) {
2960                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2961                              "automatically solved. Sync from %s node\n",
2962                              pcount, (hg < 0) ? "peer" : "this");
2963                         if (forced) {
2964                                 dev_warn(DEV, "Doing a full sync, since"
2965                                      " UUIDs where ambiguous.\n");
2966                                 hg = hg*2;
2967                         }
2968                 }
2969         }
2970
2971         if (hg == -100) {
2972                 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
2973                         hg = -1;
2974                 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
2975                         hg = 1;
2976
2977                 if (abs(hg) < 100)
2978                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2979                              "Sync from %s node\n",
2980                              (hg < 0) ? "peer" : "this");
2981         }
2982         rr_conflict = nc->rr_conflict;
2983         tentative = nc->tentative;
2984         rcu_read_unlock();
2985
2986         if (hg == -100) {
2987                 /* FIXME this log message is not correct if we end up here
2988                  * after an attempted attach on a diskless node.
2989                  * We just refuse to attach -- well, we drop the "connection"
2990                  * to that disk, in a way... */
2991                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2992                 drbd_khelper(mdev, "split-brain");
2993                 return C_MASK;
2994         }
2995
2996         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2997                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2998                 return C_MASK;
2999         }
3000
3001         if (hg < 0 && /* by intention we do not use mydisk here. */
3002             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
3003                 switch (rr_conflict) {
3004                 case ASB_CALL_HELPER:
3005                         drbd_khelper(mdev, "pri-lost");
3006                         /* fall through */
3007                 case ASB_DISCONNECT:
3008                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
3009                         return C_MASK;
3010                 case ASB_VIOLENTLY:
3011                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
3012                              "assumption\n");
3013                 }
3014         }
3015
3016         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
3017                 if (hg == 0)
3018                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
3019                 else
3020                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
3021                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3022                                  abs(hg) >= 2 ? "full" : "bit-map based");
3023                 return C_MASK;
3024         }
3025
3026         if (abs(hg) >= 2) {
3027                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3028                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3029                                         BM_LOCKED_SET_ALLOWED))
3030                         return C_MASK;
3031         }
3032
3033         if (hg > 0) { /* become sync source. */
3034                 rv = C_WF_BITMAP_S;
3035         } else if (hg < 0) { /* become sync target */
3036                 rv = C_WF_BITMAP_T;
3037         } else {
3038                 rv = C_CONNECTED;
3039                 if (drbd_bm_total_weight(mdev)) {
3040                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3041                              drbd_bm_total_weight(mdev));
3042                 }
3043         }
3044
3045         return rv;
3046 }
3047
3048 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3049 {
3050         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3051         if (peer == ASB_DISCARD_REMOTE)
3052                 return ASB_DISCARD_LOCAL;
3053
3054         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3055         if (peer == ASB_DISCARD_LOCAL)
3056                 return ASB_DISCARD_REMOTE;
3057
3058         /* everything else is valid if they are equal on both sides. */
3059         return peer;
3060 }
3061
3062 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3063 {
3064         struct p_protocol *p = pi->data;
3065         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3066         int p_proto, p_discard_my_data, p_two_primaries, cf;
3067         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3068         char integrity_alg[SHARED_SECRET_MAX] = "";
3069         struct crypto_hash *peer_integrity_tfm = NULL;
3070         void *int_dig_in = NULL, *int_dig_vv = NULL;
3071
3072         p_proto         = be32_to_cpu(p->protocol);
3073         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3074         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3075         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3076         p_two_primaries = be32_to_cpu(p->two_primaries);
3077         cf              = be32_to_cpu(p->conn_flags);
3078         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3079
3080         if (tconn->agreed_pro_version >= 87) {
3081                 int err;
3082
3083                 if (pi->size > sizeof(integrity_alg))
3084                         return -EIO;
3085                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3086                 if (err)
3087                         return err;
3088                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3089         }
3090
3091         if (pi->cmd != P_PROTOCOL_UPDATE) {
3092                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3093
3094                 if (cf & CF_DRY_RUN)
3095                         set_bit(CONN_DRY_RUN, &tconn->flags);
3096
3097                 rcu_read_lock();
3098                 nc = rcu_dereference(tconn->net_conf);
3099
3100                 if (p_proto != nc->wire_protocol) {
3101                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3102                         goto disconnect_rcu_unlock;
3103                 }
3104
3105                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3106                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3107                         goto disconnect_rcu_unlock;
3108                 }
3109
3110                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3111                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3112                         goto disconnect_rcu_unlock;
3113                 }
3114
3115                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3116                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3117                         goto disconnect_rcu_unlock;
3118                 }
3119
3120                 if (p_discard_my_data && nc->discard_my_data) {
3121                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3122                         goto disconnect_rcu_unlock;
3123                 }
3124
3125                 if (p_two_primaries != nc->two_primaries) {
3126                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3127                         goto disconnect_rcu_unlock;
3128                 }
3129
3130                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3131                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3132                         goto disconnect_rcu_unlock;
3133                 }
3134
3135                 rcu_read_unlock();
3136         }
3137
3138         if (integrity_alg[0]) {
3139                 int hash_size;
3140
3141                 /*
3142                  * We can only change the peer data integrity algorithm
3143                  * here.  Changing our own data integrity algorithm
3144                  * requires that we send a P_PROTOCOL_UPDATE packet at
3145                  * the same time; otherwise, the peer has no way to
3146                  * tell between which packets the algorithm should
3147                  * change.
3148                  */
3149
3150                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3151                 if (!peer_integrity_tfm) {
3152                         conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3153                                  integrity_alg);
3154                         goto disconnect;
3155                 }
3156
3157                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3158                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3159                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3160                 if (!(int_dig_in && int_dig_vv)) {
3161                         conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3162                         goto disconnect;
3163                 }
3164         }
3165
3166         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3167         if (!new_net_conf) {
3168                 conn_err(tconn, "Allocation of new net_conf failed\n");
3169                 goto disconnect;
3170         }
3171
3172         mutex_lock(&tconn->data.mutex);
3173         mutex_lock(&tconn->conf_update);
3174         old_net_conf = tconn->net_conf;
3175         *new_net_conf = *old_net_conf;
3176
3177         new_net_conf->wire_protocol = p_proto;
3178         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3179         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3180         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3181         new_net_conf->two_primaries = p_two_primaries;
3182
3183         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3184         mutex_unlock(&tconn->conf_update);
3185         mutex_unlock(&tconn->data.mutex);
3186
3187         crypto_free_hash(tconn->peer_integrity_tfm);
3188         kfree(tconn->int_dig_in);
3189         kfree(tconn->int_dig_vv);
3190         tconn->peer_integrity_tfm = peer_integrity_tfm;
3191         tconn->int_dig_in = int_dig_in;
3192         tconn->int_dig_vv = int_dig_vv;
3193
3194         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3195                 conn_info(tconn, "peer data-integrity-alg: %s\n",
3196                           integrity_alg[0] ? integrity_alg : "(none)");
3197
3198         synchronize_rcu();
3199         kfree(old_net_conf);
3200         return 0;
3201
3202 disconnect_rcu_unlock:
3203         rcu_read_unlock();
3204 disconnect:
3205         crypto_free_hash(peer_integrity_tfm);
3206         kfree(int_dig_in);
3207         kfree(int_dig_vv);
3208         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3209         return -EIO;
3210 }
3211
3212 /* helper function
3213  * input: alg name, feature name
3214  * return: NULL (alg name was "")
3215  *         ERR_PTR(error) if something goes wrong
3216  *         or the crypto hash ptr, if it worked out ok. */
3217 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3218                 const char *alg, const char *name)
3219 {
3220         struct crypto_hash *tfm;
3221
3222         if (!alg[0])
3223                 return NULL;
3224
3225         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3226         if (IS_ERR(tfm)) {
3227                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3228                         alg, name, PTR_ERR(tfm));
3229                 return tfm;
3230         }
3231         return tfm;
3232 }
3233
3234 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3235 {
3236         void *buffer = tconn->data.rbuf;
3237         int size = pi->size;
3238
3239         while (size) {
3240                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3241                 s = drbd_recv(tconn, buffer, s);
3242                 if (s <= 0) {
3243                         if (s < 0)
3244                                 return s;
3245                         break;
3246                 }
3247                 size -= s;
3248         }
3249         if (size)
3250                 return -EIO;
3251         return 0;
3252 }
3253
3254 /*
3255  * config_unknown_volume  -  device configuration command for unknown volume
3256  *
3257  * When a device is added to an existing connection, the node on which the
3258  * device is added first will send configuration commands to its peer but the
3259  * peer will not know about the device yet.  It will warn and ignore these
3260  * commands.  Once the device is added on the second node, the second node will
3261  * send the same device configuration commands, but in the other direction.
3262  *
3263  * (We can also end up here if drbd is misconfigured.)
3264  */
3265 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3266 {
3267         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3268                   cmdname(pi->cmd), pi->vnr);
3269         return ignore_remaining_packet(tconn, pi);
3270 }
3271
3272 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3273 {
3274         struct drbd_conf *mdev;
3275         struct p_rs_param_95 *p;
3276         unsigned int header_size, data_size, exp_max_sz;
3277         struct crypto_hash *verify_tfm = NULL;
3278         struct crypto_hash *csums_tfm = NULL;
3279         struct net_conf *old_net_conf, *new_net_conf = NULL;
3280         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3281         const int apv = tconn->agreed_pro_version;
3282         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3283         int fifo_size = 0;
3284         int err;
3285
3286         mdev = vnr_to_mdev(tconn, pi->vnr);
3287         if (!mdev)
3288                 return config_unknown_volume(tconn, pi);
3289
3290         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3291                     : apv == 88 ? sizeof(struct p_rs_param)
3292                                         + SHARED_SECRET_MAX
3293                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3294                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3295
3296         if (pi->size > exp_max_sz) {
3297                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3298                     pi->size, exp_max_sz);
3299                 return -EIO;
3300         }
3301
3302         if (apv <= 88) {
3303                 header_size = sizeof(struct p_rs_param);
3304                 data_size = pi->size - header_size;
3305         } else if (apv <= 94) {
3306                 header_size = sizeof(struct p_rs_param_89);
3307                 data_size = pi->size - header_size;
3308                 D_ASSERT(data_size == 0);
3309         } else {
3310                 header_size = sizeof(struct p_rs_param_95);
3311                 data_size = pi->size - header_size;
3312                 D_ASSERT(data_size == 0);
3313         }
3314
3315         /* initialize verify_alg and csums_alg */
3316         p = pi->data;
3317         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3318
3319         err = drbd_recv_all(mdev->tconn, p, header_size);
3320         if (err)
3321                 return err;
3322
3323         mutex_lock(&mdev->tconn->conf_update);
3324         old_net_conf = mdev->tconn->net_conf;
3325         if (get_ldev(mdev)) {
3326                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3327                 if (!new_disk_conf) {
3328                         put_ldev(mdev);
3329                         mutex_unlock(&mdev->tconn->conf_update);
3330                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3331                         return -ENOMEM;
3332                 }
3333
3334                 old_disk_conf = mdev->ldev->disk_conf;
3335                 *new_disk_conf = *old_disk_conf;
3336
3337                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3338         }
3339
3340         if (apv >= 88) {
3341                 if (apv == 88) {
3342                         if (data_size > SHARED_SECRET_MAX) {
3343                                 dev_err(DEV, "verify-alg too long, "
3344                                     "peer wants %u, accepting only %u byte\n",
3345                                                 data_size, SHARED_SECRET_MAX);
3346                                 err = -EIO;
3347                                 goto reconnect;
3348                         }
3349
3350                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3351                         if (err)
3352                                 goto reconnect;
3353                         /* we expect NUL terminated string */
3354                         /* but just in case someone tries to be evil */
3355                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3356                         p->verify_alg[data_size-1] = 0;
3357
3358                 } else /* apv >= 89 */ {
3359                         /* we still expect NUL terminated strings */
3360                         /* but just in case someone tries to be evil */
3361                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3362                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3363                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3364                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3365                 }
3366
3367                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3368                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3369                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3370                                     old_net_conf->verify_alg, p->verify_alg);
3371                                 goto disconnect;
3372                         }
3373                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3374                                         p->verify_alg, "verify-alg");
3375                         if (IS_ERR(verify_tfm)) {
3376                                 verify_tfm = NULL;
3377                                 goto disconnect;
3378                         }
3379                 }
3380
3381                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3382                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3383                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3384                                     old_net_conf->csums_alg, p->csums_alg);
3385                                 goto disconnect;
3386                         }
3387                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3388                                         p->csums_alg, "csums-alg");
3389                         if (IS_ERR(csums_tfm)) {
3390                                 csums_tfm = NULL;
3391                                 goto disconnect;
3392                         }
3393                 }
3394
3395                 if (apv > 94 && new_disk_conf) {
3396                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3397                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3398                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3399                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3400
3401                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3402                         if (fifo_size != mdev->rs_plan_s->size) {
3403                                 new_plan = fifo_alloc(fifo_size);
3404                                 if (!new_plan) {
3405                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3406                                         put_ldev(mdev);
3407                                         goto disconnect;
3408                                 }
3409                         }
3410                 }
3411
3412                 if (verify_tfm || csums_tfm) {
3413                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3414                         if (!new_net_conf) {
3415                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3416                                 goto disconnect;
3417                         }
3418
3419                         *new_net_conf = *old_net_conf;
3420
3421                         if (verify_tfm) {
3422                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3423                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3424                                 crypto_free_hash(mdev->tconn->verify_tfm);
3425                                 mdev->tconn->verify_tfm = verify_tfm;
3426                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3427                         }
3428                         if (csums_tfm) {
3429                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3430                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3431                                 crypto_free_hash(mdev->tconn->csums_tfm);
3432                                 mdev->tconn->csums_tfm = csums_tfm;
3433                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3434                         }
3435                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3436                 }
3437         }
3438
3439         if (new_disk_conf) {
3440                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3441                 put_ldev(mdev);
3442         }
3443
3444         if (new_plan) {
3445                 old_plan = mdev->rs_plan_s;
3446                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3447         }
3448
3449         mutex_unlock(&mdev->tconn->conf_update);
3450         synchronize_rcu();
3451         if (new_net_conf)
3452                 kfree(old_net_conf);
3453         kfree(old_disk_conf);
3454         kfree(old_plan);
3455
3456         return 0;
3457
3458 reconnect:
3459         if (new_disk_conf) {
3460                 put_ldev(mdev);
3461                 kfree(new_disk_conf);
3462         }
3463         mutex_unlock(&mdev->tconn->conf_update);
3464         return -EIO;
3465
3466 disconnect:
3467         kfree(new_plan);
3468         if (new_disk_conf) {
3469                 put_ldev(mdev);
3470                 kfree(new_disk_conf);
3471         }
3472         mutex_unlock(&mdev->tconn->conf_update);
3473         /* just for completeness: actually not needed,
3474          * as this is not reached if csums_tfm was ok. */
3475         crypto_free_hash(csums_tfm);
3476         /* but free the verify_tfm again, if csums_tfm did not work out */
3477         crypto_free_hash(verify_tfm);
3478         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3479         return -EIO;
3480 }
3481
3482 /* warn if the arguments differ by more than 12.5% */
3483 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3484         const char *s, sector_t a, sector_t b)
3485 {
3486         sector_t d;
3487         if (a == 0 || b == 0)
3488                 return;
3489         d = (a > b) ? (a - b) : (b - a);
3490         if (d > (a>>3) || d > (b>>3))
3491                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3492                      (unsigned long long)a, (unsigned long long)b);
3493 }
3494
3495 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3496 {
3497         struct drbd_conf *mdev;
3498         struct p_sizes *p = pi->data;
3499         enum determine_dev_size dd = unchanged;
3500         sector_t p_size, p_usize, my_usize;
3501         int ldsc = 0; /* local disk size changed */
3502         enum dds_flags ddsf;
3503
3504         mdev = vnr_to_mdev(tconn, pi->vnr);
3505         if (!mdev)
3506                 return config_unknown_volume(tconn, pi);
3507
3508         p_size = be64_to_cpu(p->d_size);
3509         p_usize = be64_to_cpu(p->u_size);
3510
3511         /* just store the peer's disk size for now.
3512          * we still need to figure out whether we accept that. */
3513         mdev->p_size = p_size;
3514
3515         if (get_ldev(mdev)) {
3516                 rcu_read_lock();
3517                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3518                 rcu_read_unlock();
3519
3520                 warn_if_differ_considerably(mdev, "lower level device sizes",
3521                            p_size, drbd_get_max_capacity(mdev->ldev));
3522                 warn_if_differ_considerably(mdev, "user requested size",
3523                                             p_usize, my_usize);
3524
3525                 /* if this is the first connect, or an otherwise expected
3526                  * param exchange, choose the minimum */
3527                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3528                         p_usize = min_not_zero(my_usize, p_usize);
3529
3530                 /* Never shrink a device with usable data during connect.
3531                    But allow online shrinking if we are connected. */
3532                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3533                     drbd_get_capacity(mdev->this_bdev) &&
3534                     mdev->state.disk >= D_OUTDATED &&
3535                     mdev->state.conn < C_CONNECTED) {
3536                         dev_err(DEV, "The peer's disk size is too small!\n");
3537                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3538                         put_ldev(mdev);
3539                         return -EIO;
3540                 }
3541
3542                 if (my_usize != p_usize) {
3543                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3544
3545                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3546                         if (!new_disk_conf) {
3547                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3548                                 put_ldev(mdev);
3549                                 return -ENOMEM;
3550                         }
3551
3552                         mutex_lock(&mdev->tconn->conf_update);
3553                         old_disk_conf = mdev->ldev->disk_conf;
3554                         *new_disk_conf = *old_disk_conf;
3555                         new_disk_conf->disk_size = p_usize;
3556
3557                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3558                         mutex_unlock(&mdev->tconn->conf_update);
3559                         synchronize_rcu();
3560                         kfree(old_disk_conf);
3561
3562                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3563                                  (unsigned long)my_usize);
3564                 }
3565
3566                 put_ldev(mdev);
3567         }
3568
3569         ddsf = be16_to_cpu(p->dds_flags);
3570         if (get_ldev(mdev)) {
3571                 dd = drbd_determine_dev_size(mdev, ddsf);
3572                 put_ldev(mdev);
3573                 if (dd == dev_size_error)
3574                         return -EIO;
3575                 drbd_md_sync(mdev);
3576         } else {
3577                 /* I am diskless, need to accept the peer's size. */
3578                 drbd_set_my_capacity(mdev, p_size);
3579         }
3580
3581         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3582         drbd_reconsider_max_bio_size(mdev);
3583
3584         if (get_ldev(mdev)) {
3585                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3586                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3587                         ldsc = 1;
3588                 }
3589
3590                 put_ldev(mdev);
3591         }
3592
3593         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3594                 if (be64_to_cpu(p->c_size) !=
3595                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3596                         /* we have different sizes, probably peer
3597                          * needs to know my new size... */
3598                         drbd_send_sizes(mdev, 0, ddsf);
3599                 }
3600                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3601                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3602                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3603                             mdev->state.disk >= D_INCONSISTENT) {
3604                                 if (ddsf & DDSF_NO_RESYNC)
3605                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3606                                 else
3607                                         resync_after_online_grow(mdev);
3608                         } else
3609                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3610                 }
3611         }
3612
3613         return 0;
3614 }
3615
3616 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3617 {
3618         struct drbd_conf *mdev;
3619         struct p_uuids *p = pi->data;
3620         u64 *p_uuid;
3621         int i, updated_uuids = 0;
3622
3623         mdev = vnr_to_mdev(tconn, pi->vnr);
3624         if (!mdev)
3625                 return config_unknown_volume(tconn, pi);
3626
3627         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3628
3629         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3630                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3631
3632         kfree(mdev->p_uuid);
3633         mdev->p_uuid = p_uuid;
3634
3635         if (mdev->state.conn < C_CONNECTED &&
3636             mdev->state.disk < D_INCONSISTENT &&
3637             mdev->state.role == R_PRIMARY &&
3638             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3639                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3640                     (unsigned long long)mdev->ed_uuid);
3641                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3642                 return -EIO;
3643         }
3644
3645         if (get_ldev(mdev)) {
3646                 int skip_initial_sync =
3647                         mdev->state.conn == C_CONNECTED &&
3648                         mdev->tconn->agreed_pro_version >= 90 &&
3649                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3650                         (p_uuid[UI_FLAGS] & 8);
3651                 if (skip_initial_sync) {
3652                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3653                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3654                                         "clear_n_write from receive_uuids",
3655                                         BM_LOCKED_TEST_ALLOWED);
3656                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3657                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3658                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3659                                         CS_VERBOSE, NULL);
3660                         drbd_md_sync(mdev);
3661                         updated_uuids = 1;
3662                 }
3663                 put_ldev(mdev);
3664         } else if (mdev->state.disk < D_INCONSISTENT &&
3665                    mdev->state.role == R_PRIMARY) {
3666                 /* I am a diskless primary, the peer just created a new current UUID
3667                    for me. */
3668                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3669         }
3670
3671         /* Before we test for the disk state, we should wait until an eventually
3672            ongoing cluster wide state change is finished. That is important if
3673            we are primary and are detaching from our disk. We need to see the
3674            new disk state... */
3675         mutex_lock(mdev->state_mutex);
3676         mutex_unlock(mdev->state_mutex);
3677         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3678                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3679
3680         if (updated_uuids)
3681                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3682
3683         return 0;
3684 }
3685
3686 /**
3687  * convert_state() - Converts the peer's view of the cluster state to our point of view
3688  * @ps:         The state as seen by the peer.
3689  */
3690 static union drbd_state convert_state(union drbd_state ps)
3691 {
3692         union drbd_state ms;
3693
3694         static enum drbd_conns c_tab[] = {
3695                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3696                 [C_CONNECTED] = C_CONNECTED,
3697
3698                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3699                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3700                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3701                 [C_VERIFY_S]       = C_VERIFY_T,
3702                 [C_MASK]   = C_MASK,
3703         };
3704
3705         ms.i = ps.i;
3706
3707         ms.conn = c_tab[ps.conn];
3708         ms.peer = ps.role;
3709         ms.role = ps.peer;
3710         ms.pdsk = ps.disk;
3711         ms.disk = ps.pdsk;
3712         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3713
3714         return ms;
3715 }
3716
3717 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3718 {
3719         struct drbd_conf *mdev;
3720         struct p_req_state *p = pi->data;
3721         union drbd_state mask, val;
3722         enum drbd_state_rv rv;
3723
3724         mdev = vnr_to_mdev(tconn, pi->vnr);
3725         if (!mdev)
3726                 return -EIO;
3727
3728         mask.i = be32_to_cpu(p->mask);
3729         val.i = be32_to_cpu(p->val);
3730
3731         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3732             mutex_is_locked(mdev->state_mutex)) {
3733                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3734                 return 0;
3735         }
3736
3737         mask = convert_state(mask);
3738         val = convert_state(val);
3739
3740         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3741         drbd_send_sr_reply(mdev, rv);
3742
3743         drbd_md_sync(mdev);
3744
3745         return 0;
3746 }
3747
3748 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3749 {
3750         struct p_req_state *p = pi->data;
3751         union drbd_state mask, val;
3752         enum drbd_state_rv rv;
3753
3754         mask.i = be32_to_cpu(p->mask);
3755         val.i = be32_to_cpu(p->val);
3756
3757         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3758             mutex_is_locked(&tconn->cstate_mutex)) {
3759                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3760                 return 0;
3761         }
3762
3763         mask = convert_state(mask);
3764         val = convert_state(val);
3765
3766         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3767         conn_send_sr_reply(tconn, rv);
3768
3769         return 0;
3770 }
3771
3772 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3773 {
3774         struct drbd_conf *mdev;
3775         struct p_state *p = pi->data;
3776         union drbd_state os, ns, peer_state;
3777         enum drbd_disk_state real_peer_disk;
3778         enum chg_state_flags cs_flags;
3779         int rv;
3780
3781         mdev = vnr_to_mdev(tconn, pi->vnr);
3782         if (!mdev)
3783                 return config_unknown_volume(tconn, pi);
3784
3785         peer_state.i = be32_to_cpu(p->state);
3786
3787         real_peer_disk = peer_state.disk;
3788         if (peer_state.disk == D_NEGOTIATING) {
3789                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3790                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3791         }
3792
3793         spin_lock_irq(&mdev->tconn->req_lock);
3794  retry:
3795         os = ns = drbd_read_state(mdev);
3796         spin_unlock_irq(&mdev->tconn->req_lock);
3797
3798         /* If some other part of the code (asender thread, timeout)
3799          * already decided to close the connection again,
3800          * we must not "re-establish" it here. */
3801         if (os.conn <= C_TEAR_DOWN)
3802                 return false;
3803
3804         /* If this is the "end of sync" confirmation, usually the peer disk
3805          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3806          * set) resync started in PausedSyncT, or if the timing of pause-/
3807          * unpause-sync events has been "just right", the peer disk may
3808          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3809          */
3810         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3811             real_peer_disk == D_UP_TO_DATE &&
3812             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3813                 /* If we are (becoming) SyncSource, but peer is still in sync
3814                  * preparation, ignore its uptodate-ness to avoid flapping, it
3815                  * will change to inconsistent once the peer reaches active
3816                  * syncing states.
3817                  * It may have changed syncer-paused flags, however, so we
3818                  * cannot ignore this completely. */
3819                 if (peer_state.conn > C_CONNECTED &&
3820                     peer_state.conn < C_SYNC_SOURCE)
3821                         real_peer_disk = D_INCONSISTENT;
3822
3823                 /* if peer_state changes to connected at the same time,
3824                  * it explicitly notifies us that it finished resync.
3825                  * Maybe we should finish it up, too? */
3826                 else if (os.conn >= C_SYNC_SOURCE &&
3827                          peer_state.conn == C_CONNECTED) {
3828                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3829                                 drbd_resync_finished(mdev);
3830                         return 0;
3831                 }
3832         }
3833
3834         /* peer says his disk is inconsistent, while we think it is uptodate,
3835          * and this happens while the peer still thinks we have a sync going on,
3836          * but we think we are already done with the sync.
3837          * We ignore this to avoid flapping pdsk.
3838          * This should not happen, if the peer is a recent version of drbd. */
3839         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3840             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3841                 real_peer_disk = D_UP_TO_DATE;
3842
3843         if (ns.conn == C_WF_REPORT_PARAMS)
3844                 ns.conn = C_CONNECTED;
3845
3846         if (peer_state.conn == C_AHEAD)
3847                 ns.conn = C_BEHIND;
3848
3849         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3850             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3851                 int cr; /* consider resync */
3852
3853                 /* if we established a new connection */
3854                 cr  = (os.conn < C_CONNECTED);
3855                 /* if we had an established connection
3856                  * and one of the nodes newly attaches a disk */
3857                 cr |= (os.conn == C_CONNECTED &&
3858                        (peer_state.disk == D_NEGOTIATING ||
3859                         os.disk == D_NEGOTIATING));
3860                 /* if we have both been inconsistent, and the peer has been
3861                  * forced to be UpToDate with --overwrite-data */
3862                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3863                 /* if we had been plain connected, and the admin requested to
3864                  * start a sync by "invalidate" or "invalidate-remote" */
3865                 cr |= (os.conn == C_CONNECTED &&
3866                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3867                                  peer_state.conn <= C_WF_BITMAP_T));
3868
3869                 if (cr)
3870                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3871
3872                 put_ldev(mdev);
3873                 if (ns.conn == C_MASK) {
3874                         ns.conn = C_CONNECTED;
3875                         if (mdev->state.disk == D_NEGOTIATING) {
3876                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3877                         } else if (peer_state.disk == D_NEGOTIATING) {
3878                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3879                                 peer_state.disk = D_DISKLESS;
3880                                 real_peer_disk = D_DISKLESS;
3881                         } else {
3882                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3883                                         return -EIO;
3884                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3885                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3886                                 return -EIO;
3887                         }
3888                 }
3889         }
3890
3891         spin_lock_irq(&mdev->tconn->req_lock);
3892         if (os.i != drbd_read_state(mdev).i)
3893                 goto retry;
3894         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3895         ns.peer = peer_state.role;
3896         ns.pdsk = real_peer_disk;
3897         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3898         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3899                 ns.disk = mdev->new_state_tmp.disk;
3900         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3901         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3902             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3903                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3904                    for temporal network outages! */
3905                 spin_unlock_irq(&mdev->tconn->req_lock);
3906                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3907                 tl_clear(mdev->tconn);
3908                 drbd_uuid_new_current(mdev);
3909                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3910                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3911                 return -EIO;
3912         }
3913         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3914         ns = drbd_read_state(mdev);
3915         spin_unlock_irq(&mdev->tconn->req_lock);
3916
3917         if (rv < SS_SUCCESS) {
3918                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3919                 return -EIO;
3920         }
3921
3922         if (os.conn > C_WF_REPORT_PARAMS) {
3923                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3924                     peer_state.disk != D_NEGOTIATING ) {
3925                         /* we want resync, peer has not yet decided to sync... */
3926                         /* Nowadays only used when forcing a node into primary role and
3927                            setting its disk to UpToDate with that */
3928                         drbd_send_uuids(mdev);
3929                         drbd_send_current_state(mdev);
3930                 }
3931         }
3932
3933         clear_bit(DISCARD_MY_DATA, &mdev->flags);
3934
3935         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3936
3937         return 0;
3938 }
3939
3940 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3941 {
3942         struct drbd_conf *mdev;
3943         struct p_rs_uuid *p = pi->data;
3944
3945         mdev = vnr_to_mdev(tconn, pi->vnr);
3946         if (!mdev)
3947                 return -EIO;
3948
3949         wait_event(mdev->misc_wait,
3950                    mdev->state.conn == C_WF_SYNC_UUID ||
3951                    mdev->state.conn == C_BEHIND ||
3952                    mdev->state.conn < C_CONNECTED ||
3953                    mdev->state.disk < D_NEGOTIATING);
3954
3955         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3956
3957         /* Here the _drbd_uuid_ functions are right, current should
3958            _not_ be rotated into the history */
3959         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3960                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3961                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3962
3963                 drbd_print_uuids(mdev, "updated sync uuid");
3964                 drbd_start_resync(mdev, C_SYNC_TARGET);
3965
3966                 put_ldev(mdev);
3967         } else
3968                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3969
3970         return 0;
3971 }
3972
3973 /**
3974  * receive_bitmap_plain
3975  *
3976  * Return 0 when done, 1 when another iteration is needed, and a negative error
3977  * code upon failure.
3978  */
3979 static int
3980 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3981                      unsigned long *p, struct bm_xfer_ctx *c)
3982 {
3983         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3984                                  drbd_header_size(mdev->tconn);
3985         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3986                                        c->bm_words - c->word_offset);
3987         unsigned int want = num_words * sizeof(*p);
3988         int err;
3989
3990         if (want != size) {
3991                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3992                 return -EIO;
3993         }
3994         if (want == 0)
3995                 return 0;
3996         err = drbd_recv_all(mdev->tconn, p, want);
3997         if (err)
3998                 return err;
3999
4000         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
4001
4002         c->word_offset += num_words;
4003         c->bit_offset = c->word_offset * BITS_PER_LONG;
4004         if (c->bit_offset > c->bm_bits)
4005                 c->bit_offset = c->bm_bits;
4006
4007         return 1;
4008 }
4009
4010 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4011 {
4012         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4013 }
4014
4015 static int dcbp_get_start(struct p_compressed_bm *p)
4016 {
4017         return (p->encoding & 0x80) != 0;
4018 }
4019
4020 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4021 {
4022         return (p->encoding >> 4) & 0x7;
4023 }
4024
4025 /**
4026  * recv_bm_rle_bits
4027  *
4028  * Return 0 when done, 1 when another iteration is needed, and a negative error
4029  * code upon failure.
4030  */
4031 static int
4032 recv_bm_rle_bits(struct drbd_conf *mdev,
4033                 struct p_compressed_bm *p,
4034                  struct bm_xfer_ctx *c,
4035                  unsigned int len)
4036 {
4037         struct bitstream bs;
4038         u64 look_ahead;
4039         u64 rl;
4040         u64 tmp;
4041         unsigned long s = c->bit_offset;
4042         unsigned long e;
4043         int toggle = dcbp_get_start(p);
4044         int have;
4045         int bits;
4046
4047         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4048
4049         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4050         if (bits < 0)
4051                 return -EIO;
4052
4053         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4054                 bits = vli_decode_bits(&rl, look_ahead);
4055                 if (bits <= 0)
4056                         return -EIO;
4057
4058                 if (toggle) {
4059                         e = s + rl -1;
4060                         if (e >= c->bm_bits) {
4061                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4062                                 return -EIO;
4063                         }
4064                         _drbd_bm_set_bits(mdev, s, e);
4065                 }
4066
4067                 if (have < bits) {
4068                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4069                                 have, bits, look_ahead,
4070                                 (unsigned int)(bs.cur.b - p->code),
4071                                 (unsigned int)bs.buf_len);
4072                         return -EIO;
4073                 }
4074                 look_ahead >>= bits;
4075                 have -= bits;
4076
4077                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4078                 if (bits < 0)
4079                         return -EIO;
4080                 look_ahead |= tmp << have;
4081                 have += bits;
4082         }
4083
4084         c->bit_offset = s;
4085         bm_xfer_ctx_bit_to_word_offset(c);
4086
4087         return (s != c->bm_bits);
4088 }
4089
4090 /**
4091  * decode_bitmap_c
4092  *
4093  * Return 0 when done, 1 when another iteration is needed, and a negative error
4094  * code upon failure.
4095  */
4096 static int
4097 decode_bitmap_c(struct drbd_conf *mdev,
4098                 struct p_compressed_bm *p,
4099                 struct bm_xfer_ctx *c,
4100                 unsigned int len)
4101 {
4102         if (dcbp_get_code(p) == RLE_VLI_Bits)
4103                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4104
4105         /* other variants had been implemented for evaluation,
4106          * but have been dropped as this one turned out to be "best"
4107          * during all our tests. */
4108
4109         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4110         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4111         return -EIO;
4112 }
4113
4114 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4115                 const char *direction, struct bm_xfer_ctx *c)
4116 {
4117         /* what would it take to transfer it "plaintext" */
4118         unsigned int header_size = drbd_header_size(mdev->tconn);
4119         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4120         unsigned int plain =
4121                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4122                 c->bm_words * sizeof(unsigned long);
4123         unsigned int total = c->bytes[0] + c->bytes[1];
4124         unsigned int r;
4125
4126         /* total can not be zero. but just in case: */
4127         if (total == 0)
4128                 return;
4129
4130         /* don't report if not compressed */
4131         if (total >= plain)
4132                 return;
4133
4134         /* total < plain. check for overflow, still */
4135         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4136                                     : (1000 * total / plain);
4137
4138         if (r > 1000)
4139                 r = 1000;
4140
4141         r = 1000 - r;
4142         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4143              "total %u; compression: %u.%u%%\n",
4144                         direction,
4145                         c->bytes[1], c->packets[1],
4146                         c->bytes[0], c->packets[0],
4147                         total, r/10, r % 10);
4148 }
4149
4150 /* Since we are processing the bitfield from lower addresses to higher,
4151    it does not matter if the process it in 32 bit chunks or 64 bit
4152    chunks as long as it is little endian. (Understand it as byte stream,
4153    beginning with the lowest byte...) If we would use big endian
4154    we would need to process it from the highest address to the lowest,
4155    in order to be agnostic to the 32 vs 64 bits issue.
4156
4157    returns 0 on failure, 1 if we successfully received it. */
4158 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4159 {
4160         struct drbd_conf *mdev;
4161         struct bm_xfer_ctx c;
4162         int err;
4163
4164         mdev = vnr_to_mdev(tconn, pi->vnr);
4165         if (!mdev)
4166                 return -EIO;
4167
4168         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4169         /* you are supposed to send additional out-of-sync information
4170          * if you actually set bits during this phase */
4171
4172         c = (struct bm_xfer_ctx) {
4173                 .bm_bits = drbd_bm_bits(mdev),
4174                 .bm_words = drbd_bm_words(mdev),
4175         };
4176
4177         for(;;) {
4178                 if (pi->cmd == P_BITMAP)
4179                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4180                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4181                         /* MAYBE: sanity check that we speak proto >= 90,
4182                          * and the feature is enabled! */
4183                         struct p_compressed_bm *p = pi->data;
4184
4185                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4186                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4187                                 err = -EIO;
4188                                 goto out;
4189                         }
4190                         if (pi->size <= sizeof(*p)) {
4191                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4192                                 err = -EIO;
4193                                 goto out;
4194                         }
4195                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4196                         if (err)
4197                                goto out;
4198                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4199                 } else {
4200                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4201                         err = -EIO;
4202                         goto out;
4203                 }
4204
4205                 c.packets[pi->cmd == P_BITMAP]++;
4206                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4207
4208                 if (err <= 0) {
4209                         if (err < 0)
4210                                 goto out;
4211                         break;
4212                 }
4213                 err = drbd_recv_header(mdev->tconn, pi);
4214                 if (err)
4215                         goto out;
4216         }
4217
4218         INFO_bm_xfer_stats(mdev, "receive", &c);
4219
4220         if (mdev->state.conn == C_WF_BITMAP_T) {
4221                 enum drbd_state_rv rv;
4222
4223                 err = drbd_send_bitmap(mdev);
4224                 if (err)
4225                         goto out;
4226                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4227                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4228                 D_ASSERT(rv == SS_SUCCESS);
4229         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4230                 /* admin may have requested C_DISCONNECTING,
4231                  * other threads may have noticed network errors */
4232                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4233                     drbd_conn_str(mdev->state.conn));
4234         }
4235         err = 0;
4236
4237  out:
4238         drbd_bm_unlock(mdev);
4239         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4240                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4241         return err;
4242 }
4243
4244 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4245 {
4246         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4247                  pi->cmd, pi->size);
4248
4249         return ignore_remaining_packet(tconn, pi);
4250 }
4251
4252 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4253 {
4254         /* Make sure we've acked all the TCP data associated
4255          * with the data requests being unplugged */
4256         drbd_tcp_quickack(tconn->data.socket);
4257
4258         return 0;
4259 }
4260
4261 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4262 {
4263         struct drbd_conf *mdev;
4264         struct p_block_desc *p = pi->data;
4265
4266         mdev = vnr_to_mdev(tconn, pi->vnr);
4267         if (!mdev)
4268                 return -EIO;
4269
4270         switch (mdev->state.conn) {
4271         case C_WF_SYNC_UUID:
4272         case C_WF_BITMAP_T:
4273         case C_BEHIND:
4274                         break;
4275         default:
4276                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4277                                 drbd_conn_str(mdev->state.conn));
4278         }
4279
4280         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4281
4282         return 0;
4283 }
4284
4285 struct data_cmd {
4286         int expect_payload;
4287         size_t pkt_size;
4288         int (*fn)(struct drbd_tconn *, struct packet_info *);
4289 };
4290
4291 static struct data_cmd drbd_cmd_handler[] = {
4292         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4293         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4294         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4295         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4296         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4297         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4298         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4299         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4300         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4301         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4302         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4303         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4304         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4305         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4306         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4307         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4308         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4309         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4310         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4311         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4312         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4313         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4314         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4315         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4316 };
4317
4318 static void drbdd(struct drbd_tconn *tconn)
4319 {
4320         struct packet_info pi;
4321         size_t shs; /* sub header size */
4322         int err;
4323
4324         while (get_t_state(&tconn->receiver) == RUNNING) {
4325                 struct data_cmd *cmd;
4326
4327                 drbd_thread_current_set_cpu(&tconn->receiver);
4328                 if (drbd_recv_header(tconn, &pi))
4329                         goto err_out;
4330
4331                 cmd = &drbd_cmd_handler[pi.cmd];
4332                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4333                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4334                                  cmdname(pi.cmd), pi.cmd);
4335                         goto err_out;
4336                 }
4337
4338                 shs = cmd->pkt_size;
4339                 if (pi.size > shs && !cmd->expect_payload) {
4340                         conn_err(tconn, "No payload expected %s l:%d\n",
4341                                  cmdname(pi.cmd), pi.size);
4342                         goto err_out;
4343                 }
4344
4345                 if (shs) {
4346                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4347                         if (err)
4348                                 goto err_out;
4349                         pi.size -= shs;
4350                 }
4351
4352                 err = cmd->fn(tconn, &pi);
4353                 if (err) {
4354                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4355                                  cmdname(pi.cmd), err, pi.size);
4356                         goto err_out;
4357                 }
4358         }
4359         return;
4360
4361     err_out:
4362         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4363 }
4364
4365 void conn_flush_workqueue(struct drbd_tconn *tconn)
4366 {
4367         struct drbd_wq_barrier barr;
4368
4369         barr.w.cb = w_prev_work_done;
4370         barr.w.tconn = tconn;
4371         init_completion(&barr.done);
4372         drbd_queue_work(&tconn->data.work, &barr.w);
4373         wait_for_completion(&barr.done);
4374 }
4375
4376 static void conn_disconnect(struct drbd_tconn *tconn)
4377 {
4378         struct drbd_conf *mdev;
4379         enum drbd_conns oc;
4380         int vnr;
4381
4382         if (tconn->cstate == C_STANDALONE)
4383                 return;
4384
4385         /* We are about to start the cleanup after connection loss.
4386          * Make sure drbd_make_request knows about that.
4387          * Usually we should be in some network failure state already,
4388          * but just in case we are not, we fix it up here.
4389          */
4390         conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4391
4392         /* asender does not clean up anything. it must not interfere, either */
4393         drbd_thread_stop(&tconn->asender);
4394         drbd_free_sock(tconn);
4395
4396         rcu_read_lock();
4397         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4398                 kref_get(&mdev->kref);
4399                 rcu_read_unlock();
4400                 drbd_disconnected(mdev);
4401                 kref_put(&mdev->kref, &drbd_minor_destroy);
4402                 rcu_read_lock();
4403         }
4404         rcu_read_unlock();
4405
4406         if (!list_empty(&tconn->current_epoch->list))
4407                 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4408         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4409         atomic_set(&tconn->current_epoch->epoch_size, 0);
4410
4411         conn_info(tconn, "Connection closed\n");
4412
4413         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4414                 conn_try_outdate_peer_async(tconn);
4415
4416         spin_lock_irq(&tconn->req_lock);
4417         oc = tconn->cstate;
4418         if (oc >= C_UNCONNECTED)
4419                 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4420
4421         spin_unlock_irq(&tconn->req_lock);
4422
4423         if (oc == C_DISCONNECTING)
4424                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4425 }
4426
4427 static int drbd_disconnected(struct drbd_conf *mdev)
4428 {
4429         unsigned int i;
4430
4431         /* wait for current activity to cease. */
4432         spin_lock_irq(&mdev->tconn->req_lock);
4433         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4434         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4435         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4436         spin_unlock_irq(&mdev->tconn->req_lock);
4437
4438         /* We do not have data structures that would allow us to
4439          * get the rs_pending_cnt down to 0 again.
4440          *  * On C_SYNC_TARGET we do not have any data structures describing
4441          *    the pending RSDataRequest's we have sent.
4442          *  * On C_SYNC_SOURCE there is no data structure that tracks
4443          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4444          *  And no, it is not the sum of the reference counts in the
4445          *  resync_LRU. The resync_LRU tracks the whole operation including
4446          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4447          *  on the fly. */
4448         drbd_rs_cancel_all(mdev);
4449         mdev->rs_total = 0;
4450         mdev->rs_failed = 0;
4451         atomic_set(&mdev->rs_pending_cnt, 0);
4452         wake_up(&mdev->misc_wait);
4453
4454         del_timer_sync(&mdev->resync_timer);
4455         resync_timer_fn((unsigned long)mdev);
4456
4457         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4458          * w_make_resync_request etc. which may still be on the worker queue
4459          * to be "canceled" */
4460         drbd_flush_workqueue(mdev);
4461
4462         drbd_finish_peer_reqs(mdev);
4463
4464         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4465            might have issued a work again. The one before drbd_finish_peer_reqs() is
4466            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4467         drbd_flush_workqueue(mdev);
4468
4469         kfree(mdev->p_uuid);
4470         mdev->p_uuid = NULL;
4471
4472         if (!drbd_suspended(mdev))
4473                 tl_clear(mdev->tconn);
4474
4475         drbd_md_sync(mdev);
4476
4477         /* serialize with bitmap writeout triggered by the state change,
4478          * if any. */
4479         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4480
4481         /* tcp_close and release of sendpage pages can be deferred.  I don't
4482          * want to use SO_LINGER, because apparently it can be deferred for
4483          * more than 20 seconds (longest time I checked).
4484          *
4485          * Actually we don't care for exactly when the network stack does its
4486          * put_page(), but release our reference on these pages right here.
4487          */
4488         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4489         if (i)
4490                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4491         i = atomic_read(&mdev->pp_in_use_by_net);
4492         if (i)
4493                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4494         i = atomic_read(&mdev->pp_in_use);
4495         if (i)
4496                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4497
4498         D_ASSERT(list_empty(&mdev->read_ee));
4499         D_ASSERT(list_empty(&mdev->active_ee));
4500         D_ASSERT(list_empty(&mdev->sync_ee));
4501         D_ASSERT(list_empty(&mdev->done_ee));
4502
4503         return 0;
4504 }
4505
4506 /*
4507  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4508  * we can agree on is stored in agreed_pro_version.
4509  *
4510  * feature flags and the reserved array should be enough room for future
4511  * enhancements of the handshake protocol, and possible plugins...
4512  *
4513  * for now, they are expected to be zero, but ignored.
4514  */
4515 static int drbd_send_features(struct drbd_tconn *tconn)
4516 {
4517         struct drbd_socket *sock;
4518         struct p_connection_features *p;
4519
4520         sock = &tconn->data;
4521         p = conn_prepare_command(tconn, sock);
4522         if (!p)
4523                 return -EIO;
4524         memset(p, 0, sizeof(*p));
4525         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4526         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4527         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4528 }
4529
4530 /*
4531  * return values:
4532  *   1 yes, we have a valid connection
4533  *   0 oops, did not work out, please try again
4534  *  -1 peer talks different language,
4535  *     no point in trying again, please go standalone.
4536  */
4537 static int drbd_do_features(struct drbd_tconn *tconn)
4538 {
4539         /* ASSERT current == tconn->receiver ... */
4540         struct p_connection_features *p;
4541         const int expect = sizeof(struct p_connection_features);
4542         struct packet_info pi;
4543         int err;
4544
4545         err = drbd_send_features(tconn);
4546         if (err)
4547                 return 0;
4548
4549         err = drbd_recv_header(tconn, &pi);
4550         if (err)
4551                 return 0;
4552
4553         if (pi.cmd != P_CONNECTION_FEATURES) {
4554                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4555                          cmdname(pi.cmd), pi.cmd);
4556                 return -1;
4557         }
4558
4559         if (pi.size != expect) {
4560                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4561                      expect, pi.size);
4562                 return -1;
4563         }
4564
4565         p = pi.data;
4566         err = drbd_recv_all_warn(tconn, p, expect);
4567         if (err)
4568                 return 0;
4569
4570         p->protocol_min = be32_to_cpu(p->protocol_min);
4571         p->protocol_max = be32_to_cpu(p->protocol_max);
4572         if (p->protocol_max == 0)
4573                 p->protocol_max = p->protocol_min;
4574
4575         if (PRO_VERSION_MAX < p->protocol_min ||
4576             PRO_VERSION_MIN > p->protocol_max)
4577                 goto incompat;
4578
4579         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4580
4581         conn_info(tconn, "Handshake successful: "
4582              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4583
4584         return 1;
4585
4586  incompat:
4587         conn_err(tconn, "incompatible DRBD dialects: "
4588             "I support %d-%d, peer supports %d-%d\n",
4589             PRO_VERSION_MIN, PRO_VERSION_MAX,
4590             p->protocol_min, p->protocol_max);
4591         return -1;
4592 }
4593
4594 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4595 static int drbd_do_auth(struct drbd_tconn *tconn)
4596 {
4597         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4598         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4599         return -1;
4600 }
4601 #else
4602 #define CHALLENGE_LEN 64
4603
4604 /* Return value:
4605         1 - auth succeeded,
4606         0 - failed, try again (network error),
4607         -1 - auth failed, don't try again.
4608 */
4609
4610 static int drbd_do_auth(struct drbd_tconn *tconn)
4611 {
4612         struct drbd_socket *sock;
4613         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4614         struct scatterlist sg;
4615         char *response = NULL;
4616         char *right_response = NULL;
4617         char *peers_ch = NULL;
4618         unsigned int key_len;
4619         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4620         unsigned int resp_size;
4621         struct hash_desc desc;
4622         struct packet_info pi;
4623         struct net_conf *nc;
4624         int err, rv;
4625
4626         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4627
4628         rcu_read_lock();
4629         nc = rcu_dereference(tconn->net_conf);
4630         key_len = strlen(nc->shared_secret);
4631         memcpy(secret, nc->shared_secret, key_len);
4632         rcu_read_unlock();
4633
4634         desc.tfm = tconn->cram_hmac_tfm;
4635         desc.flags = 0;
4636
4637         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4638         if (rv) {
4639                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4640                 rv = -1;
4641                 goto fail;
4642         }
4643
4644         get_random_bytes(my_challenge, CHALLENGE_LEN);
4645
4646         sock = &tconn->data;
4647         if (!conn_prepare_command(tconn, sock)) {
4648                 rv = 0;
4649                 goto fail;
4650         }
4651         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4652                                 my_challenge, CHALLENGE_LEN);
4653         if (!rv)
4654                 goto fail;
4655
4656         err = drbd_recv_header(tconn, &pi);
4657         if (err) {
4658                 rv = 0;
4659                 goto fail;
4660         }
4661
4662         if (pi.cmd != P_AUTH_CHALLENGE) {
4663                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4664                          cmdname(pi.cmd), pi.cmd);
4665                 rv = 0;
4666                 goto fail;
4667         }
4668
4669         if (pi.size > CHALLENGE_LEN * 2) {
4670                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4671                 rv = -1;
4672                 goto fail;
4673         }
4674
4675         peers_ch = kmalloc(pi.size, GFP_NOIO);
4676         if (peers_ch == NULL) {
4677                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4678                 rv = -1;
4679                 goto fail;
4680         }
4681
4682         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4683         if (err) {
4684                 rv = 0;
4685                 goto fail;
4686         }
4687
4688         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4689         response = kmalloc(resp_size, GFP_NOIO);
4690         if (response == NULL) {
4691                 conn_err(tconn, "kmalloc of response failed\n");
4692                 rv = -1;
4693                 goto fail;
4694         }
4695
4696         sg_init_table(&sg, 1);
4697         sg_set_buf(&sg, peers_ch, pi.size);
4698
4699         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4700         if (rv) {
4701                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4702                 rv = -1;
4703                 goto fail;
4704         }
4705
4706         if (!conn_prepare_command(tconn, sock)) {
4707                 rv = 0;
4708                 goto fail;
4709         }
4710         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4711                                 response, resp_size);
4712         if (!rv)
4713                 goto fail;
4714
4715         err = drbd_recv_header(tconn, &pi);
4716         if (err) {
4717                 rv = 0;
4718                 goto fail;
4719         }
4720
4721         if (pi.cmd != P_AUTH_RESPONSE) {
4722                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4723                          cmdname(pi.cmd), pi.cmd);
4724                 rv = 0;
4725                 goto fail;
4726         }
4727
4728         if (pi.size != resp_size) {
4729                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4730                 rv = 0;
4731                 goto fail;
4732         }
4733
4734         err = drbd_recv_all_warn(tconn, response , resp_size);
4735         if (err) {
4736                 rv = 0;
4737                 goto fail;
4738         }
4739
4740         right_response = kmalloc(resp_size, GFP_NOIO);
4741         if (right_response == NULL) {
4742                 conn_err(tconn, "kmalloc of right_response failed\n");
4743                 rv = -1;
4744                 goto fail;
4745         }
4746
4747         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4748
4749         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4750         if (rv) {
4751                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4752                 rv = -1;
4753                 goto fail;
4754         }
4755
4756         rv = !memcmp(response, right_response, resp_size);
4757
4758         if (rv)
4759                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4760                      resp_size);
4761         else
4762                 rv = -1;
4763
4764  fail:
4765         kfree(peers_ch);
4766         kfree(response);
4767         kfree(right_response);
4768
4769         return rv;
4770 }
4771 #endif
4772
4773 int drbdd_init(struct drbd_thread *thi)
4774 {
4775         struct drbd_tconn *tconn = thi->tconn;
4776         int h;
4777
4778         conn_info(tconn, "receiver (re)started\n");
4779
4780         do {
4781                 h = conn_connect(tconn);
4782                 if (h == 0) {
4783                         conn_disconnect(tconn);
4784                         schedule_timeout_interruptible(HZ);
4785                 }
4786                 if (h == -1) {
4787                         conn_warn(tconn, "Discarding network configuration.\n");
4788                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4789                 }
4790         } while (h == 0);
4791
4792         if (h > 0)
4793                 drbdd(tconn);
4794
4795         conn_disconnect(tconn);
4796
4797         conn_info(tconn, "receiver terminated\n");
4798         return 0;
4799 }
4800
4801 /* ********* acknowledge sender ******** */
4802
4803 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4804 {
4805         struct p_req_state_reply *p = pi->data;
4806         int retcode = be32_to_cpu(p->retcode);
4807
4808         if (retcode >= SS_SUCCESS) {
4809                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4810         } else {
4811                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4812                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4813                          drbd_set_st_err_str(retcode), retcode);
4814         }
4815         wake_up(&tconn->ping_wait);
4816
4817         return 0;
4818 }
4819
4820 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4821 {
4822         struct drbd_conf *mdev;
4823         struct p_req_state_reply *p = pi->data;
4824         int retcode = be32_to_cpu(p->retcode);
4825
4826         mdev = vnr_to_mdev(tconn, pi->vnr);
4827         if (!mdev)
4828                 return -EIO;
4829
4830         if (retcode >= SS_SUCCESS) {
4831                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4832         } else {
4833                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4834                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4835                         drbd_set_st_err_str(retcode), retcode);
4836         }
4837         wake_up(&mdev->state_wait);
4838
4839         return 0;
4840 }
4841
4842 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4843 {
4844         return drbd_send_ping_ack(tconn);
4845
4846 }
4847
4848 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4849 {
4850         /* restore idle timeout */
4851         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4852         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4853                 wake_up(&tconn->ping_wait);
4854
4855         return 0;
4856 }
4857
4858 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4859 {
4860         struct drbd_conf *mdev;
4861         struct p_block_ack *p = pi->data;
4862         sector_t sector = be64_to_cpu(p->sector);
4863         int blksize = be32_to_cpu(p->blksize);
4864
4865         mdev = vnr_to_mdev(tconn, pi->vnr);
4866         if (!mdev)
4867                 return -EIO;
4868
4869         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4870
4871         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4872
4873         if (get_ldev(mdev)) {
4874                 drbd_rs_complete_io(mdev, sector);
4875                 drbd_set_in_sync(mdev, sector, blksize);
4876                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4877                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4878                 put_ldev(mdev);
4879         }
4880         dec_rs_pending(mdev);
4881         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4882
4883         return 0;
4884 }
4885
4886 static int
4887 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4888                               struct rb_root *root, const char *func,
4889                               enum drbd_req_event what, bool missing_ok)
4890 {
4891         struct drbd_request *req;
4892         struct bio_and_error m;
4893
4894         spin_lock_irq(&mdev->tconn->req_lock);
4895         req = find_request(mdev, root, id, sector, missing_ok, func);
4896         if (unlikely(!req)) {
4897                 spin_unlock_irq(&mdev->tconn->req_lock);
4898                 return -EIO;
4899         }
4900         __req_mod(req, what, &m);
4901         spin_unlock_irq(&mdev->tconn->req_lock);
4902
4903         if (m.bio)
4904                 complete_master_bio(mdev, &m);
4905         return 0;
4906 }
4907
4908 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4909 {
4910         struct drbd_conf *mdev;
4911         struct p_block_ack *p = pi->data;
4912         sector_t sector = be64_to_cpu(p->sector);
4913         int blksize = be32_to_cpu(p->blksize);
4914         enum drbd_req_event what;
4915
4916         mdev = vnr_to_mdev(tconn, pi->vnr);
4917         if (!mdev)
4918                 return -EIO;
4919
4920         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4921
4922         if (p->block_id == ID_SYNCER) {
4923                 drbd_set_in_sync(mdev, sector, blksize);
4924                 dec_rs_pending(mdev);
4925                 return 0;
4926         }
4927         switch (pi->cmd) {
4928         case P_RS_WRITE_ACK:
4929                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4930                 break;
4931         case P_WRITE_ACK:
4932                 what = WRITE_ACKED_BY_PEER;
4933                 break;
4934         case P_RECV_ACK:
4935                 what = RECV_ACKED_BY_PEER;
4936                 break;
4937         case P_DISCARD_WRITE:
4938                 what = DISCARD_WRITE;
4939                 break;
4940         case P_RETRY_WRITE:
4941                 what = POSTPONE_WRITE;
4942                 break;
4943         default:
4944                 BUG();
4945         }
4946
4947         return validate_req_change_req_state(mdev, p->block_id, sector,
4948                                              &mdev->write_requests, __func__,
4949                                              what, false);
4950 }
4951
4952 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4953 {
4954         struct drbd_conf *mdev;
4955         struct p_block_ack *p = pi->data;
4956         sector_t sector = be64_to_cpu(p->sector);
4957         int size = be32_to_cpu(p->blksize);
4958         int err;
4959
4960         mdev = vnr_to_mdev(tconn, pi->vnr);
4961         if (!mdev)
4962                 return -EIO;
4963
4964         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4965
4966         if (p->block_id == ID_SYNCER) {
4967                 dec_rs_pending(mdev);
4968                 drbd_rs_failed_io(mdev, sector, size);
4969                 return 0;
4970         }
4971
4972         err = validate_req_change_req_state(mdev, p->block_id, sector,
4973                                             &mdev->write_requests, __func__,
4974                                             NEG_ACKED, true);
4975         if (err) {
4976                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4977                    The master bio might already be completed, therefore the
4978                    request is no longer in the collision hash. */
4979                 /* In Protocol B we might already have got a P_RECV_ACK
4980                    but then get a P_NEG_ACK afterwards. */
4981                 drbd_set_out_of_sync(mdev, sector, size);
4982         }
4983         return 0;
4984 }
4985
4986 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4987 {
4988         struct drbd_conf *mdev;
4989         struct p_block_ack *p = pi->data;
4990         sector_t sector = be64_to_cpu(p->sector);
4991
4992         mdev = vnr_to_mdev(tconn, pi->vnr);
4993         if (!mdev)
4994                 return -EIO;
4995
4996         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4997
4998         dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
4999             (unsigned long long)sector, be32_to_cpu(p->blksize));
5000
5001         return validate_req_change_req_state(mdev, p->block_id, sector,
5002                                              &mdev->read_requests, __func__,
5003                                              NEG_ACKED, false);
5004 }
5005
5006 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5007 {
5008         struct drbd_conf *mdev;
5009         sector_t sector;
5010         int size;
5011         struct p_block_ack *p = pi->data;
5012
5013         mdev = vnr_to_mdev(tconn, pi->vnr);
5014         if (!mdev)
5015                 return -EIO;
5016
5017         sector = be64_to_cpu(p->sector);
5018         size = be32_to_cpu(p->blksize);
5019
5020         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5021
5022         dec_rs_pending(mdev);
5023
5024         if (get_ldev_if_state(mdev, D_FAILED)) {
5025                 drbd_rs_complete_io(mdev, sector);
5026                 switch (pi->cmd) {
5027                 case P_NEG_RS_DREPLY:
5028                         drbd_rs_failed_io(mdev, sector, size);
5029                 case P_RS_CANCEL:
5030                         break;
5031                 default:
5032                         BUG();
5033                 }
5034                 put_ldev(mdev);
5035         }
5036
5037         return 0;
5038 }
5039
5040 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5041 {
5042         struct drbd_conf *mdev;
5043         struct p_barrier_ack *p = pi->data;
5044
5045         mdev = vnr_to_mdev(tconn, pi->vnr);
5046         if (!mdev)
5047                 return -EIO;
5048
5049         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
5050
5051         if (mdev->state.conn == C_AHEAD &&
5052             atomic_read(&mdev->ap_in_flight) == 0 &&
5053             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5054                 mdev->start_resync_timer.expires = jiffies + HZ;
5055                 add_timer(&mdev->start_resync_timer);
5056         }
5057
5058         return 0;
5059 }
5060
5061 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5062 {
5063         struct drbd_conf *mdev;
5064         struct p_block_ack *p = pi->data;
5065         struct drbd_work *w;
5066         sector_t sector;
5067         int size;
5068
5069         mdev = vnr_to_mdev(tconn, pi->vnr);
5070         if (!mdev)
5071                 return -EIO;
5072
5073         sector = be64_to_cpu(p->sector);
5074         size = be32_to_cpu(p->blksize);
5075
5076         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5077
5078         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5079                 drbd_ov_out_of_sync_found(mdev, sector, size);
5080         else
5081                 ov_out_of_sync_print(mdev);
5082
5083         if (!get_ldev(mdev))
5084                 return 0;
5085
5086         drbd_rs_complete_io(mdev, sector);
5087         dec_rs_pending(mdev);
5088
5089         --mdev->ov_left;
5090
5091         /* let's advance progress step marks only for every other megabyte */
5092         if ((mdev->ov_left & 0x200) == 0x200)
5093                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5094
5095         if (mdev->ov_left == 0) {
5096                 w = kmalloc(sizeof(*w), GFP_NOIO);
5097                 if (w) {
5098                         w->cb = w_ov_finished;
5099                         w->mdev = mdev;
5100                         drbd_queue_work_front(&mdev->tconn->data.work, w);
5101                 } else {
5102                         dev_err(DEV, "kmalloc(w) failed.");
5103                         ov_out_of_sync_print(mdev);
5104                         drbd_resync_finished(mdev);
5105                 }
5106         }
5107         put_ldev(mdev);
5108         return 0;
5109 }
5110
5111 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5112 {
5113         return 0;
5114 }
5115
5116 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5117 {
5118         struct drbd_conf *mdev;
5119         int vnr, not_empty = 0;
5120
5121         do {
5122                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5123                 flush_signals(current);
5124
5125                 rcu_read_lock();
5126                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5127                         kref_get(&mdev->kref);
5128                         rcu_read_unlock();
5129                         if (drbd_finish_peer_reqs(mdev)) {
5130                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5131                                 return 1;
5132                         }
5133                         kref_put(&mdev->kref, &drbd_minor_destroy);
5134                         rcu_read_lock();
5135                 }
5136                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5137
5138                 spin_lock_irq(&tconn->req_lock);
5139                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5140                         not_empty = !list_empty(&mdev->done_ee);
5141                         if (not_empty)
5142                                 break;
5143                 }
5144                 spin_unlock_irq(&tconn->req_lock);
5145                 rcu_read_unlock();
5146         } while (not_empty);
5147
5148         return 0;
5149 }
5150
5151 struct asender_cmd {
5152         size_t pkt_size;
5153         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5154 };
5155
5156 static struct asender_cmd asender_tbl[] = {
5157         [P_PING]            = { 0, got_Ping },
5158         [P_PING_ACK]        = { 0, got_PingAck },
5159         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5160         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5161         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5162         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
5163         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5164         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5165         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5166         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5167         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5168         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5169         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5170         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5171         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5172         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5173         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5174 };
5175
5176 int drbd_asender(struct drbd_thread *thi)
5177 {
5178         struct drbd_tconn *tconn = thi->tconn;
5179         struct asender_cmd *cmd = NULL;
5180         struct packet_info pi;
5181         int rv;
5182         void *buf    = tconn->meta.rbuf;
5183         int received = 0;
5184         unsigned int header_size = drbd_header_size(tconn);
5185         int expect   = header_size;
5186         bool ping_timeout_active = false;
5187         struct net_conf *nc;
5188         int ping_timeo, tcp_cork, ping_int;
5189
5190         current->policy = SCHED_RR;  /* Make this a realtime task! */
5191         current->rt_priority = 2;    /* more important than all other tasks */
5192
5193         while (get_t_state(thi) == RUNNING) {
5194                 drbd_thread_current_set_cpu(thi);
5195
5196                 rcu_read_lock();
5197                 nc = rcu_dereference(tconn->net_conf);
5198                 ping_timeo = nc->ping_timeo;
5199                 tcp_cork = nc->tcp_cork;
5200                 ping_int = nc->ping_int;
5201                 rcu_read_unlock();
5202
5203                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5204                         if (drbd_send_ping(tconn)) {
5205                                 conn_err(tconn, "drbd_send_ping has failed\n");
5206                                 goto reconnect;
5207                         }
5208                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5209                         ping_timeout_active = true;
5210                 }
5211
5212                 /* TODO: conditionally cork; it may hurt latency if we cork without
5213                    much to send */
5214                 if (tcp_cork)
5215                         drbd_tcp_cork(tconn->meta.socket);
5216                 if (tconn_finish_peer_reqs(tconn)) {
5217                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5218                         goto reconnect;
5219                 }
5220                 /* but unconditionally uncork unless disabled */
5221                 if (tcp_cork)
5222                         drbd_tcp_uncork(tconn->meta.socket);
5223
5224                 /* short circuit, recv_msg would return EINTR anyways. */
5225                 if (signal_pending(current))
5226                         continue;
5227
5228                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5229                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5230
5231                 flush_signals(current);
5232
5233                 /* Note:
5234                  * -EINTR        (on meta) we got a signal
5235                  * -EAGAIN       (on meta) rcvtimeo expired
5236                  * -ECONNRESET   other side closed the connection
5237                  * -ERESTARTSYS  (on data) we got a signal
5238                  * rv <  0       other than above: unexpected error!
5239                  * rv == expected: full header or command
5240                  * rv <  expected: "woken" by signal during receive
5241                  * rv == 0       : "connection shut down by peer"
5242                  */
5243                 if (likely(rv > 0)) {
5244                         received += rv;
5245                         buf      += rv;
5246                 } else if (rv == 0) {
5247                         conn_err(tconn, "meta connection shut down by peer.\n");
5248                         goto reconnect;
5249                 } else if (rv == -EAGAIN) {
5250                         /* If the data socket received something meanwhile,
5251                          * that is good enough: peer is still alive. */
5252                         if (time_after(tconn->last_received,
5253                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5254                                 continue;
5255                         if (ping_timeout_active) {
5256                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5257                                 goto reconnect;
5258                         }
5259                         set_bit(SEND_PING, &tconn->flags);
5260                         continue;
5261                 } else if (rv == -EINTR) {
5262                         continue;
5263                 } else {
5264                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5265                         goto reconnect;
5266                 }
5267
5268                 if (received == expect && cmd == NULL) {
5269                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5270                                 goto reconnect;
5271                         cmd = &asender_tbl[pi.cmd];
5272                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5273                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5274                                          cmdname(pi.cmd), pi.cmd);
5275                                 goto disconnect;
5276                         }
5277                         expect = header_size + cmd->pkt_size;
5278                         if (pi.size != expect - header_size) {
5279                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5280                                         pi.cmd, pi.size);
5281                                 goto reconnect;
5282                         }
5283                 }
5284                 if (received == expect) {
5285                         bool err;
5286
5287                         err = cmd->fn(tconn, &pi);
5288                         if (err) {
5289                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5290                                 goto reconnect;
5291                         }
5292
5293                         tconn->last_received = jiffies;
5294
5295                         if (cmd == &asender_tbl[P_PING_ACK]) {
5296                                 /* restore idle timeout */
5297                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5298                                 ping_timeout_active = false;
5299                         }
5300
5301                         buf      = tconn->meta.rbuf;
5302                         received = 0;
5303                         expect   = header_size;
5304                         cmd      = NULL;
5305                 }
5306         }
5307
5308         if (0) {
5309 reconnect:
5310                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5311         }
5312         if (0) {
5313 disconnect:
5314                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5315         }
5316         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5317
5318         conn_info(tconn, "asender terminated\n");
5319
5320         return 0;
5321 }