]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
36b846bcdda25cbe36d4afb67b573b44bbaf31b9
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(int vnr, void *p, void *data);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490
491 out:
492         return err;
493 }
494
495 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
496 {
497         mm_segment_t oldfs;
498         struct kvec iov = {
499                 .iov_base = buf,
500                 .iov_len = size,
501         };
502         struct msghdr msg = {
503                 .msg_iovlen = 1,
504                 .msg_iov = (struct iovec *)&iov,
505                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506         };
507         int rv;
508
509         oldfs = get_fs();
510         set_fs(KERNEL_DS);
511         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512         set_fs(oldfs);
513
514         return rv;
515 }
516
517 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
518 {
519         mm_segment_t oldfs;
520         struct kvec iov = {
521                 .iov_base = buf,
522                 .iov_len = size,
523         };
524         struct msghdr msg = {
525                 .msg_iovlen = 1,
526                 .msg_iov = (struct iovec *)&iov,
527                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528         };
529         int rv;
530
531         oldfs = get_fs();
532         set_fs(KERNEL_DS);
533
534         for (;;) {
535                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
536                 if (rv == size)
537                         break;
538
539                 /* Note:
540                  * ECONNRESET   other side closed the connection
541                  * ERESTARTSYS  (on  sock) we got a signal
542                  */
543
544                 if (rv < 0) {
545                         if (rv == -ECONNRESET)
546                                 conn_info(tconn, "sock was reset by peer\n");
547                         else if (rv != -ERESTARTSYS)
548                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
549                         break;
550                 } else if (rv == 0) {
551                         conn_info(tconn, "sock was shut down by peer\n");
552                         break;
553                 } else  {
554                         /* signal came in, or peer/link went down,
555                          * after we read a partial message
556                          */
557                         /* D_ASSERT(signal_pending(current)); */
558                         break;
559                 }
560         };
561
562         set_fs(oldfs);
563
564         if (rv != size)
565                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
566
567         return rv;
568 }
569
570 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
571 {
572         int err;
573
574         err = drbd_recv(tconn, buf, size);
575         if (err != size) {
576                 if (err >= 0)
577                         err = -EIO;
578         } else
579                 err = 0;
580         return err;
581 }
582
583 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
584 {
585         int err;
586
587         err = drbd_recv_all(tconn, buf, size);
588         if (err && !signal_pending(current))
589                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
590         return err;
591 }
592
593 /* quoting tcp(7):
594  *   On individual connections, the socket buffer size must be set prior to the
595  *   listen(2) or connect(2) calls in order to have it take effect.
596  * This is our wrapper to do so.
597  */
598 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599                 unsigned int rcv)
600 {
601         /* open coded SO_SNDBUF, SO_RCVBUF */
602         if (snd) {
603                 sock->sk->sk_sndbuf = snd;
604                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605         }
606         if (rcv) {
607                 sock->sk->sk_rcvbuf = rcv;
608                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
609         }
610 }
611
612 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
613 {
614         const char *what;
615         struct socket *sock;
616         struct sockaddr_in6 src_in6;
617         struct sockaddr_in6 peer_in6;
618         struct net_conf *nc;
619         int err, peer_addr_len, my_addr_len;
620         int sndbuf_size, rcvbuf_size, try_connect_int;
621         int disconnect_on_error = 1;
622
623         rcu_read_lock();
624         nc = rcu_dereference(tconn->net_conf);
625         if (!nc) {
626                 rcu_read_unlock();
627                 return NULL;
628         }
629
630         sndbuf_size = nc->sndbuf_size;
631         rcvbuf_size = nc->rcvbuf_size;
632         try_connect_int = nc->try_connect_int;
633
634         my_addr_len = min_t(int, nc->my_addr_len, sizeof(src_in6));
635         memcpy(&src_in6, nc->my_addr, my_addr_len);
636
637         if (((struct sockaddr *)nc->my_addr)->sa_family == AF_INET6)
638                 src_in6.sin6_port = 0;
639         else
640                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
641
642         peer_addr_len = min_t(int, nc->peer_addr_len, sizeof(src_in6));
643         memcpy(&peer_in6, nc->peer_addr, peer_addr_len);
644
645         rcu_read_unlock();
646
647         what = "sock_create_kern";
648         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
649                                SOCK_STREAM, IPPROTO_TCP, &sock);
650         if (err < 0) {
651                 sock = NULL;
652                 goto out;
653         }
654
655         sock->sk->sk_rcvtimeo =
656         sock->sk->sk_sndtimeo = try_connect_int * HZ;
657         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
658
659        /* explicitly bind to the configured IP as source IP
660         *  for the outgoing connections.
661         *  This is needed for multihomed hosts and to be
662         *  able to use lo: interfaces for drbd.
663         * Make sure to use 0 as port number, so linux selects
664         *  a free one dynamically.
665         */
666         what = "bind before connect";
667         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
668         if (err < 0)
669                 goto out;
670
671         /* connect may fail, peer not yet available.
672          * stay C_WF_CONNECTION, don't go Disconnecting! */
673         disconnect_on_error = 0;
674         what = "connect";
675         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
676
677 out:
678         if (err < 0) {
679                 if (sock) {
680                         sock_release(sock);
681                         sock = NULL;
682                 }
683                 switch (-err) {
684                         /* timeout, busy, signal pending */
685                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
686                 case EINTR: case ERESTARTSYS:
687                         /* peer not (yet) available, network problem */
688                 case ECONNREFUSED: case ENETUNREACH:
689                 case EHOSTDOWN:    case EHOSTUNREACH:
690                         disconnect_on_error = 0;
691                         break;
692                 default:
693                         conn_err(tconn, "%s failed, err = %d\n", what, err);
694                 }
695                 if (disconnect_on_error)
696                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
697         }
698
699         return sock;
700 }
701
702 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
703 {
704         int timeo, err, my_addr_len;
705         int sndbuf_size, rcvbuf_size, try_connect_int;
706         struct socket *s_estab = NULL, *s_listen;
707         struct sockaddr_in6 my_addr;
708         struct net_conf *nc;
709         const char *what;
710
711         rcu_read_lock();
712         nc = rcu_dereference(tconn->net_conf);
713         if (!nc) {
714                 rcu_read_unlock();
715                 return NULL;
716         }
717
718         sndbuf_size = nc->sndbuf_size;
719         rcvbuf_size = nc->rcvbuf_size;
720         try_connect_int = nc->try_connect_int;
721
722         my_addr_len = min_t(int, nc->my_addr_len, sizeof(struct sockaddr_in6));
723         memcpy(&my_addr, nc->my_addr, my_addr_len);
724         rcu_read_unlock();
725
726         what = "sock_create_kern";
727         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
728                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
729         if (err) {
730                 s_listen = NULL;
731                 goto out;
732         }
733
734         timeo = try_connect_int * HZ;
735         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
736
737         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
738         s_listen->sk->sk_rcvtimeo = timeo;
739         s_listen->sk->sk_sndtimeo = timeo;
740         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
741
742         what = "bind before listen";
743         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
744         if (err < 0)
745                 goto out;
746
747         err = drbd_accept(&what, s_listen, &s_estab);
748
749 out:
750         if (s_listen)
751                 sock_release(s_listen);
752         if (err < 0) {
753                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
754                         conn_err(tconn, "%s failed, err = %d\n", what, err);
755                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
756                 }
757         }
758
759         return s_estab;
760 }
761
762 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
763
764 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
765                              enum drbd_packet cmd)
766 {
767         if (!conn_prepare_command(tconn, sock))
768                 return -EIO;
769         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
770 }
771
772 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
773 {
774         unsigned int header_size = drbd_header_size(tconn);
775         struct packet_info pi;
776         int err;
777
778         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
779         if (err != header_size) {
780                 if (err >= 0)
781                         err = -EIO;
782                 return err;
783         }
784         err = decode_header(tconn, tconn->data.rbuf, &pi);
785         if (err)
786                 return err;
787         return pi.cmd;
788 }
789
790 /**
791  * drbd_socket_okay() - Free the socket if its connection is not okay
792  * @sock:       pointer to the pointer to the socket.
793  */
794 static int drbd_socket_okay(struct socket **sock)
795 {
796         int rr;
797         char tb[4];
798
799         if (!*sock)
800                 return false;
801
802         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
803
804         if (rr > 0 || rr == -EAGAIN) {
805                 return true;
806         } else {
807                 sock_release(*sock);
808                 *sock = NULL;
809                 return false;
810         }
811 }
812 /* Gets called if a connection is established, or if a new minor gets created
813    in a connection */
814 int drbd_connected(int vnr, void *p, void *data)
815 {
816         struct drbd_conf *mdev = (struct drbd_conf *)p;
817         int err;
818
819         atomic_set(&mdev->packet_seq, 0);
820         mdev->peer_seq = 0;
821
822         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
823                 &mdev->tconn->cstate_mutex :
824                 &mdev->own_state_mutex;
825
826         err = drbd_send_sync_param(mdev);
827         if (!err)
828                 err = drbd_send_sizes(mdev, 0, 0);
829         if (!err)
830                 err = drbd_send_uuids(mdev);
831         if (!err)
832                 err = drbd_send_state(mdev);
833         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
834         clear_bit(RESIZE_PENDING, &mdev->flags);
835         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
836         return err;
837 }
838
839 /*
840  * return values:
841  *   1 yes, we have a valid connection
842  *   0 oops, did not work out, please try again
843  *  -1 peer talks different language,
844  *     no point in trying again, please go standalone.
845  *  -2 We do not have a network config...
846  */
847 static int drbd_connect(struct drbd_tconn *tconn)
848 {
849         struct socket *sock, *msock;
850         struct net_conf *nc;
851         int timeout, try, h, ok;
852
853         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
854                 return -2;
855
856         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
857
858         /* Assume that the peer only understands protocol 80 until we know better.  */
859         tconn->agreed_pro_version = 80;
860
861         do {
862                 struct socket *s;
863
864                 for (try = 0;;) {
865                         /* 3 tries, this should take less than a second! */
866                         s = drbd_try_connect(tconn);
867                         if (s || ++try >= 3)
868                                 break;
869                         /* give the other side time to call bind() & listen() */
870                         schedule_timeout_interruptible(HZ / 10);
871                 }
872
873                 if (s) {
874                         if (!tconn->data.socket) {
875                                 tconn->data.socket = s;
876                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
877                         } else if (!tconn->meta.socket) {
878                                 tconn->meta.socket = s;
879                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
880                         } else {
881                                 conn_err(tconn, "Logic error in drbd_connect()\n");
882                                 goto out_release_sockets;
883                         }
884                 }
885
886                 if (tconn->data.socket && tconn->meta.socket) {
887                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
888                         ok = drbd_socket_okay(&tconn->data.socket);
889                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
890                         if (ok)
891                                 break;
892                 }
893
894 retry:
895                 s = drbd_wait_for_connect(tconn);
896                 if (s) {
897                         try = receive_first_packet(tconn, s);
898                         drbd_socket_okay(&tconn->data.socket);
899                         drbd_socket_okay(&tconn->meta.socket);
900                         switch (try) {
901                         case P_INITIAL_DATA:
902                                 if (tconn->data.socket) {
903                                         conn_warn(tconn, "initial packet S crossed\n");
904                                         sock_release(tconn->data.socket);
905                                 }
906                                 tconn->data.socket = s;
907                                 break;
908                         case P_INITIAL_META:
909                                 if (tconn->meta.socket) {
910                                         conn_warn(tconn, "initial packet M crossed\n");
911                                         sock_release(tconn->meta.socket);
912                                 }
913                                 tconn->meta.socket = s;
914                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
915                                 break;
916                         default:
917                                 conn_warn(tconn, "Error receiving initial packet\n");
918                                 sock_release(s);
919                                 if (random32() & 1)
920                                         goto retry;
921                         }
922                 }
923
924                 if (tconn->cstate <= C_DISCONNECTING)
925                         goto out_release_sockets;
926                 if (signal_pending(current)) {
927                         flush_signals(current);
928                         smp_rmb();
929                         if (get_t_state(&tconn->receiver) == EXITING)
930                                 goto out_release_sockets;
931                 }
932
933                 if (tconn->data.socket && &tconn->meta.socket) {
934                         ok = drbd_socket_okay(&tconn->data.socket);
935                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
936                         if (ok)
937                                 break;
938                 }
939         } while (1);
940
941         sock  = tconn->data.socket;
942         msock = tconn->meta.socket;
943
944         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
945         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
946
947         sock->sk->sk_allocation = GFP_NOIO;
948         msock->sk->sk_allocation = GFP_NOIO;
949
950         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
951         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
952
953         /* NOT YET ...
954          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
955          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
956          * first set it to the P_CONNECTION_FEATURES timeout,
957          * which we set to 4x the configured ping_timeout. */
958         rcu_read_lock();
959         nc = rcu_dereference(tconn->net_conf);
960
961         sock->sk->sk_sndtimeo =
962         sock->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
963
964         msock->sk->sk_rcvtimeo = nc->ping_int*HZ;
965         timeout = nc->timeout * HZ / 10;
966         rcu_read_unlock();
967
968         msock->sk->sk_sndtimeo = timeout;
969
970         /* we don't want delays.
971          * we use TCP_CORK where appropriate, though */
972         drbd_tcp_nodelay(sock);
973         drbd_tcp_nodelay(msock);
974
975         tconn->last_received = jiffies;
976
977         h = drbd_do_features(tconn);
978         if (h <= 0)
979                 return h;
980
981         if (tconn->cram_hmac_tfm) {
982                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
983                 switch (drbd_do_auth(tconn)) {
984                 case -1:
985                         conn_err(tconn, "Authentication of peer failed\n");
986                         return -1;
987                 case 0:
988                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
989                         return 0;
990                 }
991         }
992
993         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
994                 return 0;
995
996         sock->sk->sk_sndtimeo = timeout;
997         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
998
999         drbd_thread_start(&tconn->asender);
1000
1001         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1002                 return -1;
1003
1004         down_read(&drbd_cfg_rwsem);
1005         h = !idr_for_each(&tconn->volumes, drbd_connected, tconn);
1006         up_read(&drbd_cfg_rwsem);
1007         return h;
1008
1009 out_release_sockets:
1010         if (tconn->data.socket) {
1011                 sock_release(tconn->data.socket);
1012                 tconn->data.socket = NULL;
1013         }
1014         if (tconn->meta.socket) {
1015                 sock_release(tconn->meta.socket);
1016                 tconn->meta.socket = NULL;
1017         }
1018         return -1;
1019 }
1020
1021 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1022 {
1023         unsigned int header_size = drbd_header_size(tconn);
1024
1025         if (header_size == sizeof(struct p_header100) &&
1026             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1027                 struct p_header100 *h = header;
1028                 if (h->pad != 0) {
1029                         conn_err(tconn, "Header padding is not zero\n");
1030                         return -EINVAL;
1031                 }
1032                 pi->vnr = be16_to_cpu(h->volume);
1033                 pi->cmd = be16_to_cpu(h->command);
1034                 pi->size = be32_to_cpu(h->length);
1035         } else if (header_size == sizeof(struct p_header95) &&
1036                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1037                 struct p_header95 *h = header;
1038                 pi->cmd = be16_to_cpu(h->command);
1039                 pi->size = be32_to_cpu(h->length);
1040                 pi->vnr = 0;
1041         } else if (header_size == sizeof(struct p_header80) &&
1042                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1043                 struct p_header80 *h = header;
1044                 pi->cmd = be16_to_cpu(h->command);
1045                 pi->size = be16_to_cpu(h->length);
1046                 pi->vnr = 0;
1047         } else {
1048                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1049                          be32_to_cpu(*(__be32 *)header),
1050                          tconn->agreed_pro_version);
1051                 return -EINVAL;
1052         }
1053         pi->data = header + header_size;
1054         return 0;
1055 }
1056
1057 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1058 {
1059         void *buffer = tconn->data.rbuf;
1060         int err;
1061
1062         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1063         if (err)
1064                 return err;
1065
1066         err = decode_header(tconn, buffer, pi);
1067         tconn->last_received = jiffies;
1068
1069         return err;
1070 }
1071
1072 static void drbd_flush(struct drbd_conf *mdev)
1073 {
1074         int rv;
1075
1076         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1077                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1078                                         NULL);
1079                 if (rv) {
1080                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
1081                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1082                          * don't try again for ANY return value != 0
1083                          * if (rv == -EOPNOTSUPP) */
1084                         drbd_bump_write_ordering(mdev, WO_drain_io);
1085                 }
1086                 put_ldev(mdev);
1087         }
1088 }
1089
1090 /**
1091  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1092  * @mdev:       DRBD device.
1093  * @epoch:      Epoch object.
1094  * @ev:         Epoch event.
1095  */
1096 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1097                                                struct drbd_epoch *epoch,
1098                                                enum epoch_event ev)
1099 {
1100         int epoch_size;
1101         struct drbd_epoch *next_epoch;
1102         enum finish_epoch rv = FE_STILL_LIVE;
1103
1104         spin_lock(&mdev->epoch_lock);
1105         do {
1106                 next_epoch = NULL;
1107
1108                 epoch_size = atomic_read(&epoch->epoch_size);
1109
1110                 switch (ev & ~EV_CLEANUP) {
1111                 case EV_PUT:
1112                         atomic_dec(&epoch->active);
1113                         break;
1114                 case EV_GOT_BARRIER_NR:
1115                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1116                         break;
1117                 case EV_BECAME_LAST:
1118                         /* nothing to do*/
1119                         break;
1120                 }
1121
1122                 if (epoch_size != 0 &&
1123                     atomic_read(&epoch->active) == 0 &&
1124                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1125                         if (!(ev & EV_CLEANUP)) {
1126                                 spin_unlock(&mdev->epoch_lock);
1127                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1128                                 spin_lock(&mdev->epoch_lock);
1129                         }
1130                         dec_unacked(mdev);
1131
1132                         if (mdev->current_epoch != epoch) {
1133                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1134                                 list_del(&epoch->list);
1135                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1136                                 mdev->epochs--;
1137                                 kfree(epoch);
1138
1139                                 if (rv == FE_STILL_LIVE)
1140                                         rv = FE_DESTROYED;
1141                         } else {
1142                                 epoch->flags = 0;
1143                                 atomic_set(&epoch->epoch_size, 0);
1144                                 /* atomic_set(&epoch->active, 0); is already zero */
1145                                 if (rv == FE_STILL_LIVE)
1146                                         rv = FE_RECYCLED;
1147                                 wake_up(&mdev->ee_wait);
1148                         }
1149                 }
1150
1151                 if (!next_epoch)
1152                         break;
1153
1154                 epoch = next_epoch;
1155         } while (1);
1156
1157         spin_unlock(&mdev->epoch_lock);
1158
1159         return rv;
1160 }
1161
1162 /**
1163  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1164  * @mdev:       DRBD device.
1165  * @wo:         Write ordering method to try.
1166  */
1167 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1168 {
1169         enum write_ordering_e pwo;
1170         static char *write_ordering_str[] = {
1171                 [WO_none] = "none",
1172                 [WO_drain_io] = "drain",
1173                 [WO_bdev_flush] = "flush",
1174         };
1175
1176         pwo = mdev->write_ordering;
1177         wo = min(pwo, wo);
1178         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1179                 wo = WO_drain_io;
1180         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1181                 wo = WO_none;
1182         mdev->write_ordering = wo;
1183         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1184                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1185 }
1186
1187 /**
1188  * drbd_submit_peer_request()
1189  * @mdev:       DRBD device.
1190  * @peer_req:   peer request
1191  * @rw:         flag field, see bio->bi_rw
1192  *
1193  * May spread the pages to multiple bios,
1194  * depending on bio_add_page restrictions.
1195  *
1196  * Returns 0 if all bios have been submitted,
1197  * -ENOMEM if we could not allocate enough bios,
1198  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1199  *  single page to an empty bio (which should never happen and likely indicates
1200  *  that the lower level IO stack is in some way broken). This has been observed
1201  *  on certain Xen deployments.
1202  */
1203 /* TODO allocate from our own bio_set. */
1204 int drbd_submit_peer_request(struct drbd_conf *mdev,
1205                              struct drbd_peer_request *peer_req,
1206                              const unsigned rw, const int fault_type)
1207 {
1208         struct bio *bios = NULL;
1209         struct bio *bio;
1210         struct page *page = peer_req->pages;
1211         sector_t sector = peer_req->i.sector;
1212         unsigned ds = peer_req->i.size;
1213         unsigned n_bios = 0;
1214         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1215         int err = -ENOMEM;
1216
1217         /* In most cases, we will only need one bio.  But in case the lower
1218          * level restrictions happen to be different at this offset on this
1219          * side than those of the sending peer, we may need to submit the
1220          * request in more than one bio.
1221          *
1222          * Plain bio_alloc is good enough here, this is no DRBD internally
1223          * generated bio, but a bio allocated on behalf of the peer.
1224          */
1225 next_bio:
1226         bio = bio_alloc(GFP_NOIO, nr_pages);
1227         if (!bio) {
1228                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1229                 goto fail;
1230         }
1231         /* > peer_req->i.sector, unless this is the first bio */
1232         bio->bi_sector = sector;
1233         bio->bi_bdev = mdev->ldev->backing_bdev;
1234         bio->bi_rw = rw;
1235         bio->bi_private = peer_req;
1236         bio->bi_end_io = drbd_peer_request_endio;
1237
1238         bio->bi_next = bios;
1239         bios = bio;
1240         ++n_bios;
1241
1242         page_chain_for_each(page) {
1243                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1244                 if (!bio_add_page(bio, page, len, 0)) {
1245                         /* A single page must always be possible!
1246                          * But in case it fails anyways,
1247                          * we deal with it, and complain (below). */
1248                         if (bio->bi_vcnt == 0) {
1249                                 dev_err(DEV,
1250                                         "bio_add_page failed for len=%u, "
1251                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1252                                         len, (unsigned long long)bio->bi_sector);
1253                                 err = -ENOSPC;
1254                                 goto fail;
1255                         }
1256                         goto next_bio;
1257                 }
1258                 ds -= len;
1259                 sector += len >> 9;
1260                 --nr_pages;
1261         }
1262         D_ASSERT(page == NULL);
1263         D_ASSERT(ds == 0);
1264
1265         atomic_set(&peer_req->pending_bios, n_bios);
1266         do {
1267                 bio = bios;
1268                 bios = bios->bi_next;
1269                 bio->bi_next = NULL;
1270
1271                 drbd_generic_make_request(mdev, fault_type, bio);
1272         } while (bios);
1273         return 0;
1274
1275 fail:
1276         while (bios) {
1277                 bio = bios;
1278                 bios = bios->bi_next;
1279                 bio_put(bio);
1280         }
1281         return err;
1282 }
1283
1284 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1285                                              struct drbd_peer_request *peer_req)
1286 {
1287         struct drbd_interval *i = &peer_req->i;
1288
1289         drbd_remove_interval(&mdev->write_requests, i);
1290         drbd_clear_interval(i);
1291
1292         /* Wake up any processes waiting for this peer request to complete.  */
1293         if (i->waiting)
1294                 wake_up(&mdev->misc_wait);
1295 }
1296
1297 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1298 {
1299         struct drbd_conf *mdev;
1300         int rv;
1301         struct p_barrier *p = pi->data;
1302         struct drbd_epoch *epoch;
1303
1304         mdev = vnr_to_mdev(tconn, pi->vnr);
1305         if (!mdev)
1306                 return -EIO;
1307
1308         inc_unacked(mdev);
1309
1310         mdev->current_epoch->barrier_nr = p->barrier;
1311         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1312
1313         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1314          * the activity log, which means it would not be resynced in case the
1315          * R_PRIMARY crashes now.
1316          * Therefore we must send the barrier_ack after the barrier request was
1317          * completed. */
1318         switch (mdev->write_ordering) {
1319         case WO_none:
1320                 if (rv == FE_RECYCLED)
1321                         return 0;
1322
1323                 /* receiver context, in the writeout path of the other node.
1324                  * avoid potential distributed deadlock */
1325                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1326                 if (epoch)
1327                         break;
1328                 else
1329                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1330                         /* Fall through */
1331
1332         case WO_bdev_flush:
1333         case WO_drain_io:
1334                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1335                 drbd_flush(mdev);
1336
1337                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1338                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1339                         if (epoch)
1340                                 break;
1341                 }
1342
1343                 epoch = mdev->current_epoch;
1344                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1345
1346                 D_ASSERT(atomic_read(&epoch->active) == 0);
1347                 D_ASSERT(epoch->flags == 0);
1348
1349                 return 0;
1350         default:
1351                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1352                 return -EIO;
1353         }
1354
1355         epoch->flags = 0;
1356         atomic_set(&epoch->epoch_size, 0);
1357         atomic_set(&epoch->active, 0);
1358
1359         spin_lock(&mdev->epoch_lock);
1360         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1361                 list_add(&epoch->list, &mdev->current_epoch->list);
1362                 mdev->current_epoch = epoch;
1363                 mdev->epochs++;
1364         } else {
1365                 /* The current_epoch got recycled while we allocated this one... */
1366                 kfree(epoch);
1367         }
1368         spin_unlock(&mdev->epoch_lock);
1369
1370         return 0;
1371 }
1372
1373 /* used from receive_RSDataReply (recv_resync_read)
1374  * and from receive_Data */
1375 static struct drbd_peer_request *
1376 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1377               int data_size) __must_hold(local)
1378 {
1379         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1380         struct drbd_peer_request *peer_req;
1381         struct page *page;
1382         int dgs, ds, err;
1383         void *dig_in = mdev->tconn->int_dig_in;
1384         void *dig_vv = mdev->tconn->int_dig_vv;
1385         unsigned long *data;
1386
1387         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->peer_integrity_tfm) ?
1388                 crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm) : 0;
1389
1390         if (dgs) {
1391                 /*
1392                  * FIXME: Receive the incoming digest into the receive buffer
1393                  *        here, together with its struct p_data?
1394                  */
1395                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1396                 if (err)
1397                         return NULL;
1398         }
1399
1400         data_size -= dgs;
1401
1402         if (!expect(data_size != 0))
1403                 return NULL;
1404         if (!expect(IS_ALIGNED(data_size, 512)))
1405                 return NULL;
1406         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1407                 return NULL;
1408
1409         /* even though we trust out peer,
1410          * we sometimes have to double check. */
1411         if (sector + (data_size>>9) > capacity) {
1412                 dev_err(DEV, "request from peer beyond end of local disk: "
1413                         "capacity: %llus < sector: %llus + size: %u\n",
1414                         (unsigned long long)capacity,
1415                         (unsigned long long)sector, data_size);
1416                 return NULL;
1417         }
1418
1419         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1420          * "criss-cross" setup, that might cause write-out on some other DRBD,
1421          * which in turn might block on the other node at this very place.  */
1422         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1423         if (!peer_req)
1424                 return NULL;
1425
1426         ds = data_size;
1427         page = peer_req->pages;
1428         page_chain_for_each(page) {
1429                 unsigned len = min_t(int, ds, PAGE_SIZE);
1430                 data = kmap(page);
1431                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1432                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1433                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1434                         data[0] = data[0] ^ (unsigned long)-1;
1435                 }
1436                 kunmap(page);
1437                 if (err) {
1438                         drbd_free_peer_req(mdev, peer_req);
1439                         return NULL;
1440                 }
1441                 ds -= len;
1442         }
1443
1444         if (dgs) {
1445                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1446                 if (memcmp(dig_in, dig_vv, dgs)) {
1447                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1448                                 (unsigned long long)sector, data_size);
1449                         drbd_free_peer_req(mdev, peer_req);
1450                         return NULL;
1451                 }
1452         }
1453         mdev->recv_cnt += data_size>>9;
1454         return peer_req;
1455 }
1456
1457 /* drbd_drain_block() just takes a data block
1458  * out of the socket input buffer, and discards it.
1459  */
1460 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1461 {
1462         struct page *page;
1463         int err = 0;
1464         void *data;
1465
1466         if (!data_size)
1467                 return 0;
1468
1469         page = drbd_alloc_pages(mdev, 1, 1);
1470
1471         data = kmap(page);
1472         while (data_size) {
1473                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1474
1475                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1476                 if (err)
1477                         break;
1478                 data_size -= len;
1479         }
1480         kunmap(page);
1481         drbd_free_pages(mdev, page, 0);
1482         return err;
1483 }
1484
1485 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1486                            sector_t sector, int data_size)
1487 {
1488         struct bio_vec *bvec;
1489         struct bio *bio;
1490         int dgs, err, i, expect;
1491         void *dig_in = mdev->tconn->int_dig_in;
1492         void *dig_vv = mdev->tconn->int_dig_vv;
1493
1494         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->peer_integrity_tfm) ?
1495                 crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm) : 0;
1496
1497         if (dgs) {
1498                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1499                 if (err)
1500                         return err;
1501         }
1502
1503         data_size -= dgs;
1504
1505         /* optimistically update recv_cnt.  if receiving fails below,
1506          * we disconnect anyways, and counters will be reset. */
1507         mdev->recv_cnt += data_size>>9;
1508
1509         bio = req->master_bio;
1510         D_ASSERT(sector == bio->bi_sector);
1511
1512         bio_for_each_segment(bvec, bio, i) {
1513                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1514                 expect = min_t(int, data_size, bvec->bv_len);
1515                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1516                 kunmap(bvec->bv_page);
1517                 if (err)
1518                         return err;
1519                 data_size -= expect;
1520         }
1521
1522         if (dgs) {
1523                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1524                 if (memcmp(dig_in, dig_vv, dgs)) {
1525                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1526                         return -EINVAL;
1527                 }
1528         }
1529
1530         D_ASSERT(data_size == 0);
1531         return 0;
1532 }
1533
1534 /*
1535  * e_end_resync_block() is called in asender context via
1536  * drbd_finish_peer_reqs().
1537  */
1538 static int e_end_resync_block(struct drbd_work *w, int unused)
1539 {
1540         struct drbd_peer_request *peer_req =
1541                 container_of(w, struct drbd_peer_request, w);
1542         struct drbd_conf *mdev = w->mdev;
1543         sector_t sector = peer_req->i.sector;
1544         int err;
1545
1546         D_ASSERT(drbd_interval_empty(&peer_req->i));
1547
1548         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1549                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1550                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1551         } else {
1552                 /* Record failure to sync */
1553                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1554
1555                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1556         }
1557         dec_unacked(mdev);
1558
1559         return err;
1560 }
1561
1562 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1563 {
1564         struct drbd_peer_request *peer_req;
1565
1566         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1567         if (!peer_req)
1568                 goto fail;
1569
1570         dec_rs_pending(mdev);
1571
1572         inc_unacked(mdev);
1573         /* corresponding dec_unacked() in e_end_resync_block()
1574          * respective _drbd_clear_done_ee */
1575
1576         peer_req->w.cb = e_end_resync_block;
1577
1578         spin_lock_irq(&mdev->tconn->req_lock);
1579         list_add(&peer_req->w.list, &mdev->sync_ee);
1580         spin_unlock_irq(&mdev->tconn->req_lock);
1581
1582         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1583         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1584                 return 0;
1585
1586         /* don't care for the reason here */
1587         dev_err(DEV, "submit failed, triggering re-connect\n");
1588         spin_lock_irq(&mdev->tconn->req_lock);
1589         list_del(&peer_req->w.list);
1590         spin_unlock_irq(&mdev->tconn->req_lock);
1591
1592         drbd_free_peer_req(mdev, peer_req);
1593 fail:
1594         put_ldev(mdev);
1595         return -EIO;
1596 }
1597
1598 static struct drbd_request *
1599 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1600              sector_t sector, bool missing_ok, const char *func)
1601 {
1602         struct drbd_request *req;
1603
1604         /* Request object according to our peer */
1605         req = (struct drbd_request *)(unsigned long)id;
1606         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1607                 return req;
1608         if (!missing_ok) {
1609                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1610                         (unsigned long)id, (unsigned long long)sector);
1611         }
1612         return NULL;
1613 }
1614
1615 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1616 {
1617         struct drbd_conf *mdev;
1618         struct drbd_request *req;
1619         sector_t sector;
1620         int err;
1621         struct p_data *p = pi->data;
1622
1623         mdev = vnr_to_mdev(tconn, pi->vnr);
1624         if (!mdev)
1625                 return -EIO;
1626
1627         sector = be64_to_cpu(p->sector);
1628
1629         spin_lock_irq(&mdev->tconn->req_lock);
1630         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1631         spin_unlock_irq(&mdev->tconn->req_lock);
1632         if (unlikely(!req))
1633                 return -EIO;
1634
1635         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1636          * special casing it there for the various failure cases.
1637          * still no race with drbd_fail_pending_reads */
1638         err = recv_dless_read(mdev, req, sector, pi->size);
1639         if (!err)
1640                 req_mod(req, DATA_RECEIVED);
1641         /* else: nothing. handled from drbd_disconnect...
1642          * I don't think we may complete this just yet
1643          * in case we are "on-disconnect: freeze" */
1644
1645         return err;
1646 }
1647
1648 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1649 {
1650         struct drbd_conf *mdev;
1651         sector_t sector;
1652         int err;
1653         struct p_data *p = pi->data;
1654
1655         mdev = vnr_to_mdev(tconn, pi->vnr);
1656         if (!mdev)
1657                 return -EIO;
1658
1659         sector = be64_to_cpu(p->sector);
1660         D_ASSERT(p->block_id == ID_SYNCER);
1661
1662         if (get_ldev(mdev)) {
1663                 /* data is submitted to disk within recv_resync_read.
1664                  * corresponding put_ldev done below on error,
1665                  * or in drbd_peer_request_endio. */
1666                 err = recv_resync_read(mdev, sector, pi->size);
1667         } else {
1668                 if (__ratelimit(&drbd_ratelimit_state))
1669                         dev_err(DEV, "Can not write resync data to local disk.\n");
1670
1671                 err = drbd_drain_block(mdev, pi->size);
1672
1673                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1674         }
1675
1676         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1677
1678         return err;
1679 }
1680
1681 static int w_restart_write(struct drbd_work *w, int cancel)
1682 {
1683         struct drbd_request *req = container_of(w, struct drbd_request, w);
1684         struct drbd_conf *mdev = w->mdev;
1685         struct bio *bio;
1686         unsigned long start_time;
1687         unsigned long flags;
1688
1689         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1690         if (!expect(req->rq_state & RQ_POSTPONED)) {
1691                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1692                 return -EIO;
1693         }
1694         bio = req->master_bio;
1695         start_time = req->start_time;
1696         /* Postponed requests will not have their master_bio completed!  */
1697         __req_mod(req, DISCARD_WRITE, NULL);
1698         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1699
1700         while (__drbd_make_request(mdev, bio, start_time))
1701                 /* retry */ ;
1702         return 0;
1703 }
1704
1705 static void restart_conflicting_writes(struct drbd_conf *mdev,
1706                                        sector_t sector, int size)
1707 {
1708         struct drbd_interval *i;
1709         struct drbd_request *req;
1710
1711         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1712                 if (!i->local)
1713                         continue;
1714                 req = container_of(i, struct drbd_request, i);
1715                 if (req->rq_state & RQ_LOCAL_PENDING ||
1716                     !(req->rq_state & RQ_POSTPONED))
1717                         continue;
1718                 if (expect(list_empty(&req->w.list))) {
1719                         req->w.mdev = mdev;
1720                         req->w.cb = w_restart_write;
1721                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1722                 }
1723         }
1724 }
1725
1726 /*
1727  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1728  */
1729 static int e_end_block(struct drbd_work *w, int cancel)
1730 {
1731         struct drbd_peer_request *peer_req =
1732                 container_of(w, struct drbd_peer_request, w);
1733         struct drbd_conf *mdev = w->mdev;
1734         sector_t sector = peer_req->i.sector;
1735         int err = 0, pcmd;
1736
1737         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1738                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1739                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1740                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1741                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1742                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1743                         err = drbd_send_ack(mdev, pcmd, peer_req);
1744                         if (pcmd == P_RS_WRITE_ACK)
1745                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1746                 } else {
1747                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1748                         /* we expect it to be marked out of sync anyways...
1749                          * maybe assert this?  */
1750                 }
1751                 dec_unacked(mdev);
1752         }
1753         /* we delete from the conflict detection hash _after_ we sent out the
1754          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1755         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1756                 spin_lock_irq(&mdev->tconn->req_lock);
1757                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1758                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1759                 if (peer_req->flags & EE_RESTART_REQUESTS)
1760                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1761                 spin_unlock_irq(&mdev->tconn->req_lock);
1762         } else
1763                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1764
1765         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1766
1767         return err;
1768 }
1769
1770 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1771 {
1772         struct drbd_conf *mdev = w->mdev;
1773         struct drbd_peer_request *peer_req =
1774                 container_of(w, struct drbd_peer_request, w);
1775         int err;
1776
1777         err = drbd_send_ack(mdev, ack, peer_req);
1778         dec_unacked(mdev);
1779
1780         return err;
1781 }
1782
1783 static int e_send_discard_write(struct drbd_work *w, int unused)
1784 {
1785         return e_send_ack(w, P_DISCARD_WRITE);
1786 }
1787
1788 static int e_send_retry_write(struct drbd_work *w, int unused)
1789 {
1790         struct drbd_tconn *tconn = w->mdev->tconn;
1791
1792         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1793                              P_RETRY_WRITE : P_DISCARD_WRITE);
1794 }
1795
1796 static bool seq_greater(u32 a, u32 b)
1797 {
1798         /*
1799          * We assume 32-bit wrap-around here.
1800          * For 24-bit wrap-around, we would have to shift:
1801          *  a <<= 8; b <<= 8;
1802          */
1803         return (s32)a - (s32)b > 0;
1804 }
1805
1806 static u32 seq_max(u32 a, u32 b)
1807 {
1808         return seq_greater(a, b) ? a : b;
1809 }
1810
1811 static bool need_peer_seq(struct drbd_conf *mdev)
1812 {
1813         struct drbd_tconn *tconn = mdev->tconn;
1814         int tp;
1815
1816         /*
1817          * We only need to keep track of the last packet_seq number of our peer
1818          * if we are in dual-primary mode and we have the discard flag set; see
1819          * handle_write_conflicts().
1820          */
1821
1822         rcu_read_lock();
1823         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1824         rcu_read_unlock();
1825
1826         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1827 }
1828
1829 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1830 {
1831         unsigned int newest_peer_seq;
1832
1833         if (need_peer_seq(mdev)) {
1834                 spin_lock(&mdev->peer_seq_lock);
1835                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1836                 mdev->peer_seq = newest_peer_seq;
1837                 spin_unlock(&mdev->peer_seq_lock);
1838                 /* wake up only if we actually changed mdev->peer_seq */
1839                 if (peer_seq == newest_peer_seq)
1840                         wake_up(&mdev->seq_wait);
1841         }
1842 }
1843
1844 /* Called from receive_Data.
1845  * Synchronize packets on sock with packets on msock.
1846  *
1847  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1848  * packet traveling on msock, they are still processed in the order they have
1849  * been sent.
1850  *
1851  * Note: we don't care for Ack packets overtaking P_DATA packets.
1852  *
1853  * In case packet_seq is larger than mdev->peer_seq number, there are
1854  * outstanding packets on the msock. We wait for them to arrive.
1855  * In case we are the logically next packet, we update mdev->peer_seq
1856  * ourselves. Correctly handles 32bit wrap around.
1857  *
1858  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1859  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1860  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1861  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1862  *
1863  * returns 0 if we may process the packet,
1864  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1865 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1866 {
1867         DEFINE_WAIT(wait);
1868         long timeout;
1869         int ret;
1870
1871         if (!need_peer_seq(mdev))
1872                 return 0;
1873
1874         spin_lock(&mdev->peer_seq_lock);
1875         for (;;) {
1876                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1877                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1878                         ret = 0;
1879                         break;
1880                 }
1881                 if (signal_pending(current)) {
1882                         ret = -ERESTARTSYS;
1883                         break;
1884                 }
1885                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1886                 spin_unlock(&mdev->peer_seq_lock);
1887                 rcu_read_lock();
1888                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1889                 rcu_read_unlock();
1890                 timeout = schedule_timeout(timeout);
1891                 spin_lock(&mdev->peer_seq_lock);
1892                 if (!timeout) {
1893                         ret = -ETIMEDOUT;
1894                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1895                         break;
1896                 }
1897         }
1898         spin_unlock(&mdev->peer_seq_lock);
1899         finish_wait(&mdev->seq_wait, &wait);
1900         return ret;
1901 }
1902
1903 /* see also bio_flags_to_wire()
1904  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1905  * flags and back. We may replicate to other kernel versions. */
1906 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1907 {
1908         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1909                 (dpf & DP_FUA ? REQ_FUA : 0) |
1910                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1911                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1912 }
1913
1914 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1915                                     unsigned int size)
1916 {
1917         struct drbd_interval *i;
1918
1919     repeat:
1920         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1921                 struct drbd_request *req;
1922                 struct bio_and_error m;
1923
1924                 if (!i->local)
1925                         continue;
1926                 req = container_of(i, struct drbd_request, i);
1927                 if (!(req->rq_state & RQ_POSTPONED))
1928                         continue;
1929                 req->rq_state &= ~RQ_POSTPONED;
1930                 __req_mod(req, NEG_ACKED, &m);
1931                 spin_unlock_irq(&mdev->tconn->req_lock);
1932                 if (m.bio)
1933                         complete_master_bio(mdev, &m);
1934                 spin_lock_irq(&mdev->tconn->req_lock);
1935                 goto repeat;
1936         }
1937 }
1938
1939 static int handle_write_conflicts(struct drbd_conf *mdev,
1940                                   struct drbd_peer_request *peer_req)
1941 {
1942         struct drbd_tconn *tconn = mdev->tconn;
1943         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1944         sector_t sector = peer_req->i.sector;
1945         const unsigned int size = peer_req->i.size;
1946         struct drbd_interval *i;
1947         bool equal;
1948         int err;
1949
1950         /*
1951          * Inserting the peer request into the write_requests tree will prevent
1952          * new conflicting local requests from being added.
1953          */
1954         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1955
1956     repeat:
1957         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1958                 if (i == &peer_req->i)
1959                         continue;
1960
1961                 if (!i->local) {
1962                         /*
1963                          * Our peer has sent a conflicting remote request; this
1964                          * should not happen in a two-node setup.  Wait for the
1965                          * earlier peer request to complete.
1966                          */
1967                         err = drbd_wait_misc(mdev, i);
1968                         if (err)
1969                                 goto out;
1970                         goto repeat;
1971                 }
1972
1973                 equal = i->sector == sector && i->size == size;
1974                 if (resolve_conflicts) {
1975                         /*
1976                          * If the peer request is fully contained within the
1977                          * overlapping request, it can be discarded; otherwise,
1978                          * it will be retried once all overlapping requests
1979                          * have completed.
1980                          */
1981                         bool discard = i->sector <= sector && i->sector +
1982                                        (i->size >> 9) >= sector + (size >> 9);
1983
1984                         if (!equal)
1985                                 dev_alert(DEV, "Concurrent writes detected: "
1986                                                "local=%llus +%u, remote=%llus +%u, "
1987                                                "assuming %s came first\n",
1988                                           (unsigned long long)i->sector, i->size,
1989                                           (unsigned long long)sector, size,
1990                                           discard ? "local" : "remote");
1991
1992                         inc_unacked(mdev);
1993                         peer_req->w.cb = discard ? e_send_discard_write :
1994                                                    e_send_retry_write;
1995                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
1996                         wake_asender(mdev->tconn);
1997
1998                         err = -ENOENT;
1999                         goto out;
2000                 } else {
2001                         struct drbd_request *req =
2002                                 container_of(i, struct drbd_request, i);
2003
2004                         if (!equal)
2005                                 dev_alert(DEV, "Concurrent writes detected: "
2006                                                "local=%llus +%u, remote=%llus +%u\n",
2007                                           (unsigned long long)i->sector, i->size,
2008                                           (unsigned long long)sector, size);
2009
2010                         if (req->rq_state & RQ_LOCAL_PENDING ||
2011                             !(req->rq_state & RQ_POSTPONED)) {
2012                                 /*
2013                                  * Wait for the node with the discard flag to
2014                                  * decide if this request will be discarded or
2015                                  * retried.  Requests that are discarded will
2016                                  * disappear from the write_requests tree.
2017                                  *
2018                                  * In addition, wait for the conflicting
2019                                  * request to finish locally before submitting
2020                                  * the conflicting peer request.
2021                                  */
2022                                 err = drbd_wait_misc(mdev, &req->i);
2023                                 if (err) {
2024                                         _conn_request_state(mdev->tconn,
2025                                                             NS(conn, C_TIMEOUT),
2026                                                             CS_HARD);
2027                                         fail_postponed_requests(mdev, sector, size);
2028                                         goto out;
2029                                 }
2030                                 goto repeat;
2031                         }
2032                         /*
2033                          * Remember to restart the conflicting requests after
2034                          * the new peer request has completed.
2035                          */
2036                         peer_req->flags |= EE_RESTART_REQUESTS;
2037                 }
2038         }
2039         err = 0;
2040
2041     out:
2042         if (err)
2043                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2044         return err;
2045 }
2046
2047 /* mirrored write */
2048 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2049 {
2050         struct drbd_conf *mdev;
2051         sector_t sector;
2052         struct drbd_peer_request *peer_req;
2053         struct p_data *p = pi->data;
2054         u32 peer_seq = be32_to_cpu(p->seq_num);
2055         int rw = WRITE;
2056         u32 dp_flags;
2057         int err, tp;
2058
2059         mdev = vnr_to_mdev(tconn, pi->vnr);
2060         if (!mdev)
2061                 return -EIO;
2062
2063         if (!get_ldev(mdev)) {
2064                 int err2;
2065
2066                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2067                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2068                 atomic_inc(&mdev->current_epoch->epoch_size);
2069                 err2 = drbd_drain_block(mdev, pi->size);
2070                 if (!err)
2071                         err = err2;
2072                 return err;
2073         }
2074
2075         /*
2076          * Corresponding put_ldev done either below (on various errors), or in
2077          * drbd_peer_request_endio, if we successfully submit the data at the
2078          * end of this function.
2079          */
2080
2081         sector = be64_to_cpu(p->sector);
2082         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2083         if (!peer_req) {
2084                 put_ldev(mdev);
2085                 return -EIO;
2086         }
2087
2088         peer_req->w.cb = e_end_block;
2089
2090         dp_flags = be32_to_cpu(p->dp_flags);
2091         rw |= wire_flags_to_bio(mdev, dp_flags);
2092
2093         if (dp_flags & DP_MAY_SET_IN_SYNC)
2094                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2095
2096         spin_lock(&mdev->epoch_lock);
2097         peer_req->epoch = mdev->current_epoch;
2098         atomic_inc(&peer_req->epoch->epoch_size);
2099         atomic_inc(&peer_req->epoch->active);
2100         spin_unlock(&mdev->epoch_lock);
2101
2102         rcu_read_lock();
2103         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2104         rcu_read_unlock();
2105         if (tp) {
2106                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2107                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2108                 if (err)
2109                         goto out_interrupted;
2110                 spin_lock_irq(&mdev->tconn->req_lock);
2111                 err = handle_write_conflicts(mdev, peer_req);
2112                 if (err) {
2113                         spin_unlock_irq(&mdev->tconn->req_lock);
2114                         if (err == -ENOENT) {
2115                                 put_ldev(mdev);
2116                                 return 0;
2117                         }
2118                         goto out_interrupted;
2119                 }
2120         } else
2121                 spin_lock_irq(&mdev->tconn->req_lock);
2122         list_add(&peer_req->w.list, &mdev->active_ee);
2123         spin_unlock_irq(&mdev->tconn->req_lock);
2124
2125         if (mdev->tconn->agreed_pro_version < 100) {
2126                 rcu_read_lock();
2127                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2128                 case DRBD_PROT_C:
2129                         dp_flags |= DP_SEND_WRITE_ACK;
2130                         break;
2131                 case DRBD_PROT_B:
2132                         dp_flags |= DP_SEND_RECEIVE_ACK;
2133                         break;
2134                 }
2135                 rcu_read_unlock();
2136         }
2137
2138         if (dp_flags & DP_SEND_WRITE_ACK) {
2139                 peer_req->flags |= EE_SEND_WRITE_ACK;
2140                 inc_unacked(mdev);
2141                 /* corresponding dec_unacked() in e_end_block()
2142                  * respective _drbd_clear_done_ee */
2143         }
2144
2145         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2146                 /* I really don't like it that the receiver thread
2147                  * sends on the msock, but anyways */
2148                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2149         }
2150
2151         if (mdev->state.pdsk < D_INCONSISTENT) {
2152                 /* In case we have the only disk of the cluster, */
2153                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2154                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2155                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2156                 drbd_al_begin_io(mdev, &peer_req->i);
2157         }
2158
2159         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2160         if (!err)
2161                 return 0;
2162
2163         /* don't care for the reason here */
2164         dev_err(DEV, "submit failed, triggering re-connect\n");
2165         spin_lock_irq(&mdev->tconn->req_lock);
2166         list_del(&peer_req->w.list);
2167         drbd_remove_epoch_entry_interval(mdev, peer_req);
2168         spin_unlock_irq(&mdev->tconn->req_lock);
2169         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2170                 drbd_al_complete_io(mdev, &peer_req->i);
2171
2172 out_interrupted:
2173         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2174         put_ldev(mdev);
2175         drbd_free_peer_req(mdev, peer_req);
2176         return err;
2177 }
2178
2179 /* We may throttle resync, if the lower device seems to be busy,
2180  * and current sync rate is above c_min_rate.
2181  *
2182  * To decide whether or not the lower device is busy, we use a scheme similar
2183  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2184  * (more than 64 sectors) of activity we cannot account for with our own resync
2185  * activity, it obviously is "busy".
2186  *
2187  * The current sync rate used here uses only the most recent two step marks,
2188  * to have a short time average so we can react faster.
2189  */
2190 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2191 {
2192         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2193         unsigned long db, dt, dbdt;
2194         struct lc_element *tmp;
2195         int curr_events;
2196         int throttle = 0;
2197
2198         /* feature disabled? */
2199         if (mdev->ldev->dc.c_min_rate == 0)
2200                 return 0;
2201
2202         spin_lock_irq(&mdev->al_lock);
2203         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2204         if (tmp) {
2205                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2206                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2207                         spin_unlock_irq(&mdev->al_lock);
2208                         return 0;
2209                 }
2210                 /* Do not slow down if app IO is already waiting for this extent */
2211         }
2212         spin_unlock_irq(&mdev->al_lock);
2213
2214         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2215                       (int)part_stat_read(&disk->part0, sectors[1]) -
2216                         atomic_read(&mdev->rs_sect_ev);
2217
2218         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2219                 unsigned long rs_left;
2220                 int i;
2221
2222                 mdev->rs_last_events = curr_events;
2223
2224                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2225                  * approx. */
2226                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2227
2228                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2229                         rs_left = mdev->ov_left;
2230                 else
2231                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2232
2233                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2234                 if (!dt)
2235                         dt++;
2236                 db = mdev->rs_mark_left[i] - rs_left;
2237                 dbdt = Bit2KB(db/dt);
2238
2239                 if (dbdt > mdev->ldev->dc.c_min_rate)
2240                         throttle = 1;
2241         }
2242         return throttle;
2243 }
2244
2245
2246 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2247 {
2248         struct drbd_conf *mdev;
2249         sector_t sector;
2250         sector_t capacity;
2251         struct drbd_peer_request *peer_req;
2252         struct digest_info *di = NULL;
2253         int size, verb;
2254         unsigned int fault_type;
2255         struct p_block_req *p = pi->data;
2256
2257         mdev = vnr_to_mdev(tconn, pi->vnr);
2258         if (!mdev)
2259                 return -EIO;
2260         capacity = drbd_get_capacity(mdev->this_bdev);
2261
2262         sector = be64_to_cpu(p->sector);
2263         size   = be32_to_cpu(p->blksize);
2264
2265         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2266                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2267                                 (unsigned long long)sector, size);
2268                 return -EINVAL;
2269         }
2270         if (sector + (size>>9) > capacity) {
2271                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2272                                 (unsigned long long)sector, size);
2273                 return -EINVAL;
2274         }
2275
2276         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2277                 verb = 1;
2278                 switch (pi->cmd) {
2279                 case P_DATA_REQUEST:
2280                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2281                         break;
2282                 case P_RS_DATA_REQUEST:
2283                 case P_CSUM_RS_REQUEST:
2284                 case P_OV_REQUEST:
2285                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2286                         break;
2287                 case P_OV_REPLY:
2288                         verb = 0;
2289                         dec_rs_pending(mdev);
2290                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2291                         break;
2292                 default:
2293                         BUG();
2294                 }
2295                 if (verb && __ratelimit(&drbd_ratelimit_state))
2296                         dev_err(DEV, "Can not satisfy peer's read request, "
2297                             "no local data.\n");
2298
2299                 /* drain possibly payload */
2300                 return drbd_drain_block(mdev, pi->size);
2301         }
2302
2303         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2304          * "criss-cross" setup, that might cause write-out on some other DRBD,
2305          * which in turn might block on the other node at this very place.  */
2306         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2307         if (!peer_req) {
2308                 put_ldev(mdev);
2309                 return -ENOMEM;
2310         }
2311
2312         switch (pi->cmd) {
2313         case P_DATA_REQUEST:
2314                 peer_req->w.cb = w_e_end_data_req;
2315                 fault_type = DRBD_FAULT_DT_RD;
2316                 /* application IO, don't drbd_rs_begin_io */
2317                 goto submit;
2318
2319         case P_RS_DATA_REQUEST:
2320                 peer_req->w.cb = w_e_end_rsdata_req;
2321                 fault_type = DRBD_FAULT_RS_RD;
2322                 /* used in the sector offset progress display */
2323                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2324                 break;
2325
2326         case P_OV_REPLY:
2327         case P_CSUM_RS_REQUEST:
2328                 fault_type = DRBD_FAULT_RS_RD;
2329                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2330                 if (!di)
2331                         goto out_free_e;
2332
2333                 di->digest_size = pi->size;
2334                 di->digest = (((char *)di)+sizeof(struct digest_info));
2335
2336                 peer_req->digest = di;
2337                 peer_req->flags |= EE_HAS_DIGEST;
2338
2339                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2340                         goto out_free_e;
2341
2342                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2343                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2344                         peer_req->w.cb = w_e_end_csum_rs_req;
2345                         /* used in the sector offset progress display */
2346                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2347                 } else if (pi->cmd == P_OV_REPLY) {
2348                         /* track progress, we may need to throttle */
2349                         atomic_add(size >> 9, &mdev->rs_sect_in);
2350                         peer_req->w.cb = w_e_end_ov_reply;
2351                         dec_rs_pending(mdev);
2352                         /* drbd_rs_begin_io done when we sent this request,
2353                          * but accounting still needs to be done. */
2354                         goto submit_for_resync;
2355                 }
2356                 break;
2357
2358         case P_OV_REQUEST:
2359                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2360                     mdev->tconn->agreed_pro_version >= 90) {
2361                         unsigned long now = jiffies;
2362                         int i;
2363                         mdev->ov_start_sector = sector;
2364                         mdev->ov_position = sector;
2365                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2366                         mdev->rs_total = mdev->ov_left;
2367                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2368                                 mdev->rs_mark_left[i] = mdev->ov_left;
2369                                 mdev->rs_mark_time[i] = now;
2370                         }
2371                         dev_info(DEV, "Online Verify start sector: %llu\n",
2372                                         (unsigned long long)sector);
2373                 }
2374                 peer_req->w.cb = w_e_end_ov_req;
2375                 fault_type = DRBD_FAULT_RS_RD;
2376                 break;
2377
2378         default:
2379                 BUG();
2380         }
2381
2382         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2383          * wrt the receiver, but it is not as straightforward as it may seem.
2384          * Various places in the resync start and stop logic assume resync
2385          * requests are processed in order, requeuing this on the worker thread
2386          * introduces a bunch of new code for synchronization between threads.
2387          *
2388          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2389          * "forever", throttling after drbd_rs_begin_io will lock that extent
2390          * for application writes for the same time.  For now, just throttle
2391          * here, where the rest of the code expects the receiver to sleep for
2392          * a while, anyways.
2393          */
2394
2395         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2396          * this defers syncer requests for some time, before letting at least
2397          * on request through.  The resync controller on the receiving side
2398          * will adapt to the incoming rate accordingly.
2399          *
2400          * We cannot throttle here if remote is Primary/SyncTarget:
2401          * we would also throttle its application reads.
2402          * In that case, throttling is done on the SyncTarget only.
2403          */
2404         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2405                 schedule_timeout_uninterruptible(HZ/10);
2406         if (drbd_rs_begin_io(mdev, sector))
2407                 goto out_free_e;
2408
2409 submit_for_resync:
2410         atomic_add(size >> 9, &mdev->rs_sect_ev);
2411
2412 submit:
2413         inc_unacked(mdev);
2414         spin_lock_irq(&mdev->tconn->req_lock);
2415         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2416         spin_unlock_irq(&mdev->tconn->req_lock);
2417
2418         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2419                 return 0;
2420
2421         /* don't care for the reason here */
2422         dev_err(DEV, "submit failed, triggering re-connect\n");
2423         spin_lock_irq(&mdev->tconn->req_lock);
2424         list_del(&peer_req->w.list);
2425         spin_unlock_irq(&mdev->tconn->req_lock);
2426         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2427
2428 out_free_e:
2429         put_ldev(mdev);
2430         drbd_free_peer_req(mdev, peer_req);
2431         return -EIO;
2432 }
2433
2434 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2435 {
2436         int self, peer, rv = -100;
2437         unsigned long ch_self, ch_peer;
2438         enum drbd_after_sb_p after_sb_0p;
2439
2440         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2441         peer = mdev->p_uuid[UI_BITMAP] & 1;
2442
2443         ch_peer = mdev->p_uuid[UI_SIZE];
2444         ch_self = mdev->comm_bm_set;
2445
2446         rcu_read_lock();
2447         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2448         rcu_read_unlock();
2449         switch (after_sb_0p) {
2450         case ASB_CONSENSUS:
2451         case ASB_DISCARD_SECONDARY:
2452         case ASB_CALL_HELPER:
2453         case ASB_VIOLENTLY:
2454                 dev_err(DEV, "Configuration error.\n");
2455                 break;
2456         case ASB_DISCONNECT:
2457                 break;
2458         case ASB_DISCARD_YOUNGER_PRI:
2459                 if (self == 0 && peer == 1) {
2460                         rv = -1;
2461                         break;
2462                 }
2463                 if (self == 1 && peer == 0) {
2464                         rv =  1;
2465                         break;
2466                 }
2467                 /* Else fall through to one of the other strategies... */
2468         case ASB_DISCARD_OLDER_PRI:
2469                 if (self == 0 && peer == 1) {
2470                         rv = 1;
2471                         break;
2472                 }
2473                 if (self == 1 && peer == 0) {
2474                         rv = -1;
2475                         break;
2476                 }
2477                 /* Else fall through to one of the other strategies... */
2478                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2479                      "Using discard-least-changes instead\n");
2480         case ASB_DISCARD_ZERO_CHG:
2481                 if (ch_peer == 0 && ch_self == 0) {
2482                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2483                                 ? -1 : 1;
2484                         break;
2485                 } else {
2486                         if (ch_peer == 0) { rv =  1; break; }
2487                         if (ch_self == 0) { rv = -1; break; }
2488                 }
2489                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2490                         break;
2491         case ASB_DISCARD_LEAST_CHG:
2492                 if      (ch_self < ch_peer)
2493                         rv = -1;
2494                 else if (ch_self > ch_peer)
2495                         rv =  1;
2496                 else /* ( ch_self == ch_peer ) */
2497                      /* Well, then use something else. */
2498                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2499                                 ? -1 : 1;
2500                 break;
2501         case ASB_DISCARD_LOCAL:
2502                 rv = -1;
2503                 break;
2504         case ASB_DISCARD_REMOTE:
2505                 rv =  1;
2506         }
2507
2508         return rv;
2509 }
2510
2511 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2512 {
2513         int hg, rv = -100;
2514         enum drbd_after_sb_p after_sb_1p;
2515
2516         rcu_read_lock();
2517         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2518         rcu_read_unlock();
2519         switch (after_sb_1p) {
2520         case ASB_DISCARD_YOUNGER_PRI:
2521         case ASB_DISCARD_OLDER_PRI:
2522         case ASB_DISCARD_LEAST_CHG:
2523         case ASB_DISCARD_LOCAL:
2524         case ASB_DISCARD_REMOTE:
2525         case ASB_DISCARD_ZERO_CHG:
2526                 dev_err(DEV, "Configuration error.\n");
2527                 break;
2528         case ASB_DISCONNECT:
2529                 break;
2530         case ASB_CONSENSUS:
2531                 hg = drbd_asb_recover_0p(mdev);
2532                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2533                         rv = hg;
2534                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2535                         rv = hg;
2536                 break;
2537         case ASB_VIOLENTLY:
2538                 rv = drbd_asb_recover_0p(mdev);
2539                 break;
2540         case ASB_DISCARD_SECONDARY:
2541                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2542         case ASB_CALL_HELPER:
2543                 hg = drbd_asb_recover_0p(mdev);
2544                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2545                         enum drbd_state_rv rv2;
2546
2547                         drbd_set_role(mdev, R_SECONDARY, 0);
2548                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2549                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2550                           * we do not need to wait for the after state change work either. */
2551                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2552                         if (rv2 != SS_SUCCESS) {
2553                                 drbd_khelper(mdev, "pri-lost-after-sb");
2554                         } else {
2555                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2556                                 rv = hg;
2557                         }
2558                 } else
2559                         rv = hg;
2560         }
2561
2562         return rv;
2563 }
2564
2565 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2566 {
2567         int hg, rv = -100;
2568         enum drbd_after_sb_p after_sb_2p;
2569
2570         rcu_read_lock();
2571         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2572         rcu_read_unlock();
2573         switch (after_sb_2p) {
2574         case ASB_DISCARD_YOUNGER_PRI:
2575         case ASB_DISCARD_OLDER_PRI:
2576         case ASB_DISCARD_LEAST_CHG:
2577         case ASB_DISCARD_LOCAL:
2578         case ASB_DISCARD_REMOTE:
2579         case ASB_CONSENSUS:
2580         case ASB_DISCARD_SECONDARY:
2581         case ASB_DISCARD_ZERO_CHG:
2582                 dev_err(DEV, "Configuration error.\n");
2583                 break;
2584         case ASB_VIOLENTLY:
2585                 rv = drbd_asb_recover_0p(mdev);
2586                 break;
2587         case ASB_DISCONNECT:
2588                 break;
2589         case ASB_CALL_HELPER:
2590                 hg = drbd_asb_recover_0p(mdev);
2591                 if (hg == -1) {
2592                         enum drbd_state_rv rv2;
2593
2594                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2595                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2596                           * we do not need to wait for the after state change work either. */
2597                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2598                         if (rv2 != SS_SUCCESS) {
2599                                 drbd_khelper(mdev, "pri-lost-after-sb");
2600                         } else {
2601                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2602                                 rv = hg;
2603                         }
2604                 } else
2605                         rv = hg;
2606         }
2607
2608         return rv;
2609 }
2610
2611 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2612                            u64 bits, u64 flags)
2613 {
2614         if (!uuid) {
2615                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2616                 return;
2617         }
2618         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2619              text,
2620              (unsigned long long)uuid[UI_CURRENT],
2621              (unsigned long long)uuid[UI_BITMAP],
2622              (unsigned long long)uuid[UI_HISTORY_START],
2623              (unsigned long long)uuid[UI_HISTORY_END],
2624              (unsigned long long)bits,
2625              (unsigned long long)flags);
2626 }
2627
2628 /*
2629   100   after split brain try auto recover
2630     2   C_SYNC_SOURCE set BitMap
2631     1   C_SYNC_SOURCE use BitMap
2632     0   no Sync
2633    -1   C_SYNC_TARGET use BitMap
2634    -2   C_SYNC_TARGET set BitMap
2635  -100   after split brain, disconnect
2636 -1000   unrelated data
2637 -1091   requires proto 91
2638 -1096   requires proto 96
2639  */
2640 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2641 {
2642         u64 self, peer;
2643         int i, j;
2644
2645         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2646         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2647
2648         *rule_nr = 10;
2649         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2650                 return 0;
2651
2652         *rule_nr = 20;
2653         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2654              peer != UUID_JUST_CREATED)
2655                 return -2;
2656
2657         *rule_nr = 30;
2658         if (self != UUID_JUST_CREATED &&
2659             (peer == UUID_JUST_CREATED || peer == (u64)0))
2660                 return 2;
2661
2662         if (self == peer) {
2663                 int rct, dc; /* roles at crash time */
2664
2665                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2666
2667                         if (mdev->tconn->agreed_pro_version < 91)
2668                                 return -1091;
2669
2670                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2671                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2672                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2673                                 drbd_uuid_set_bm(mdev, 0UL);
2674
2675                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2676                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2677                                 *rule_nr = 34;
2678                         } else {
2679                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2680                                 *rule_nr = 36;
2681                         }
2682
2683                         return 1;
2684                 }
2685
2686                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2687
2688                         if (mdev->tconn->agreed_pro_version < 91)
2689                                 return -1091;
2690
2691                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2692                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2693                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2694
2695                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2696                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2697                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2698
2699                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2700                                 *rule_nr = 35;
2701                         } else {
2702                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2703                                 *rule_nr = 37;
2704                         }
2705
2706                         return -1;
2707                 }
2708
2709                 /* Common power [off|failure] */
2710                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2711                         (mdev->p_uuid[UI_FLAGS] & 2);
2712                 /* lowest bit is set when we were primary,
2713                  * next bit (weight 2) is set when peer was primary */
2714                 *rule_nr = 40;
2715
2716                 switch (rct) {
2717                 case 0: /* !self_pri && !peer_pri */ return 0;
2718                 case 1: /*  self_pri && !peer_pri */ return 1;
2719                 case 2: /* !self_pri &&  peer_pri */ return -1;
2720                 case 3: /*  self_pri &&  peer_pri */
2721                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2722                         return dc ? -1 : 1;
2723                 }
2724         }
2725
2726         *rule_nr = 50;
2727         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2728         if (self == peer)
2729                 return -1;
2730
2731         *rule_nr = 51;
2732         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2733         if (self == peer) {
2734                 if (mdev->tconn->agreed_pro_version < 96 ?
2735                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2736                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2737                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2738                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2739                            resync as sync source modifications of the peer's UUIDs. */
2740
2741                         if (mdev->tconn->agreed_pro_version < 91)
2742                                 return -1091;
2743
2744                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2745                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2746
2747                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2748                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2749
2750                         return -1;
2751                 }
2752         }
2753
2754         *rule_nr = 60;
2755         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2756         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2757                 peer = mdev->p_uuid[i] & ~((u64)1);
2758                 if (self == peer)
2759                         return -2;
2760         }
2761
2762         *rule_nr = 70;
2763         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2764         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2765         if (self == peer)
2766                 return 1;
2767
2768         *rule_nr = 71;
2769         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2770         if (self == peer) {
2771                 if (mdev->tconn->agreed_pro_version < 96 ?
2772                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2773                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2774                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2775                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2776                            resync as sync source modifications of our UUIDs. */
2777
2778                         if (mdev->tconn->agreed_pro_version < 91)
2779                                 return -1091;
2780
2781                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2782                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2783
2784                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2785                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2786                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2787
2788                         return 1;
2789                 }
2790         }
2791
2792
2793         *rule_nr = 80;
2794         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2795         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2796                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2797                 if (self == peer)
2798                         return 2;
2799         }
2800
2801         *rule_nr = 90;
2802         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2803         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2804         if (self == peer && self != ((u64)0))
2805                 return 100;
2806
2807         *rule_nr = 100;
2808         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2809                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2810                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2811                         peer = mdev->p_uuid[j] & ~((u64)1);
2812                         if (self == peer)
2813                                 return -100;
2814                 }
2815         }
2816
2817         return -1000;
2818 }
2819
2820 /* drbd_sync_handshake() returns the new conn state on success, or
2821    CONN_MASK (-1) on failure.
2822  */
2823 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2824                                            enum drbd_disk_state peer_disk) __must_hold(local)
2825 {
2826         enum drbd_conns rv = C_MASK;
2827         enum drbd_disk_state mydisk;
2828         struct net_conf *nc;
2829         int hg, rule_nr, rr_conflict, dry_run;
2830
2831         mydisk = mdev->state.disk;
2832         if (mydisk == D_NEGOTIATING)
2833                 mydisk = mdev->new_state_tmp.disk;
2834
2835         dev_info(DEV, "drbd_sync_handshake:\n");
2836         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2837         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2838                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2839
2840         hg = drbd_uuid_compare(mdev, &rule_nr);
2841
2842         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2843
2844         if (hg == -1000) {
2845                 dev_alert(DEV, "Unrelated data, aborting!\n");
2846                 return C_MASK;
2847         }
2848         if (hg < -1000) {
2849                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2850                 return C_MASK;
2851         }
2852
2853         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2854             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2855                 int f = (hg == -100) || abs(hg) == 2;
2856                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2857                 if (f)
2858                         hg = hg*2;
2859                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2860                      hg > 0 ? "source" : "target");
2861         }
2862
2863         if (abs(hg) == 100)
2864                 drbd_khelper(mdev, "initial-split-brain");
2865
2866         rcu_read_lock();
2867         nc = rcu_dereference(mdev->tconn->net_conf);
2868
2869         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2870                 int pcount = (mdev->state.role == R_PRIMARY)
2871                            + (peer_role == R_PRIMARY);
2872                 int forced = (hg == -100);
2873
2874                 switch (pcount) {
2875                 case 0:
2876                         hg = drbd_asb_recover_0p(mdev);
2877                         break;
2878                 case 1:
2879                         hg = drbd_asb_recover_1p(mdev);
2880                         break;
2881                 case 2:
2882                         hg = drbd_asb_recover_2p(mdev);
2883                         break;
2884                 }
2885                 if (abs(hg) < 100) {
2886                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2887                              "automatically solved. Sync from %s node\n",
2888                              pcount, (hg < 0) ? "peer" : "this");
2889                         if (forced) {
2890                                 dev_warn(DEV, "Doing a full sync, since"
2891                                      " UUIDs where ambiguous.\n");
2892                                 hg = hg*2;
2893                         }
2894                 }
2895         }
2896
2897         if (hg == -100) {
2898                 if (nc->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2899                         hg = -1;
2900                 if (!nc->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2901                         hg = 1;
2902
2903                 if (abs(hg) < 100)
2904                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2905                              "Sync from %s node\n",
2906                              (hg < 0) ? "peer" : "this");
2907         }
2908         rr_conflict = nc->rr_conflict;
2909         dry_run = nc->dry_run;
2910         rcu_read_unlock();
2911
2912         if (hg == -100) {
2913                 /* FIXME this log message is not correct if we end up here
2914                  * after an attempted attach on a diskless node.
2915                  * We just refuse to attach -- well, we drop the "connection"
2916                  * to that disk, in a way... */
2917                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2918                 drbd_khelper(mdev, "split-brain");
2919                 return C_MASK;
2920         }
2921
2922         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2923                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2924                 return C_MASK;
2925         }
2926
2927         if (hg < 0 && /* by intention we do not use mydisk here. */
2928             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2929                 switch (rr_conflict) {
2930                 case ASB_CALL_HELPER:
2931                         drbd_khelper(mdev, "pri-lost");
2932                         /* fall through */
2933                 case ASB_DISCONNECT:
2934                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2935                         return C_MASK;
2936                 case ASB_VIOLENTLY:
2937                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2938                              "assumption\n");
2939                 }
2940         }
2941
2942         if (dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2943                 if (hg == 0)
2944                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2945                 else
2946                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2947                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2948                                  abs(hg) >= 2 ? "full" : "bit-map based");
2949                 return C_MASK;
2950         }
2951
2952         if (abs(hg) >= 2) {
2953                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2954                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2955                                         BM_LOCKED_SET_ALLOWED))
2956                         return C_MASK;
2957         }
2958
2959         if (hg > 0) { /* become sync source. */
2960                 rv = C_WF_BITMAP_S;
2961         } else if (hg < 0) { /* become sync target */
2962                 rv = C_WF_BITMAP_T;
2963         } else {
2964                 rv = C_CONNECTED;
2965                 if (drbd_bm_total_weight(mdev)) {
2966                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2967                              drbd_bm_total_weight(mdev));
2968                 }
2969         }
2970
2971         return rv;
2972 }
2973
2974 /* returns 1 if invalid */
2975 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2976 {
2977         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2978         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2979             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2980                 return 0;
2981
2982         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2983         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2984             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2985                 return 1;
2986
2987         /* everything else is valid if they are equal on both sides. */
2988         if (peer == self)
2989                 return 0;
2990
2991         /* everything es is invalid. */
2992         return 1;
2993 }
2994
2995 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2996 {
2997         struct p_protocol *p = pi->data;
2998         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2999         int p_want_lose, p_two_primaries, cf;
3000         char p_integrity_alg[SHARED_SECRET_MAX] = "";
3001         struct net_conf *nc;
3002
3003         p_proto         = be32_to_cpu(p->protocol);
3004         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3005         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3006         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3007         p_two_primaries = be32_to_cpu(p->two_primaries);
3008         cf              = be32_to_cpu(p->conn_flags);
3009         p_want_lose = cf & CF_WANT_LOSE;
3010
3011         if (tconn->agreed_pro_version >= 87) {
3012                 int err;
3013
3014                 if (pi->size > sizeof(p_integrity_alg))
3015                         return -EIO;
3016                 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
3017                 if (err)
3018                         return err;
3019
3020                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
3021         }
3022
3023         clear_bit(CONN_DRY_RUN, &tconn->flags);
3024
3025         if (cf & CF_DRY_RUN)
3026                 set_bit(CONN_DRY_RUN, &tconn->flags);
3027
3028         rcu_read_lock();
3029         nc = rcu_dereference(tconn->net_conf);
3030
3031         if (p_proto != nc->wire_protocol && tconn->agreed_pro_version < 100) {
3032                 conn_err(tconn, "incompatible communication protocols\n");
3033                 goto disconnect_rcu_unlock;
3034         }
3035
3036         if (cmp_after_sb(p_after_sb_0p, nc->after_sb_0p)) {
3037                 conn_err(tconn, "incompatible after-sb-0pri settings\n");
3038                 goto disconnect_rcu_unlock;
3039         }
3040
3041         if (cmp_after_sb(p_after_sb_1p, nc->after_sb_1p)) {
3042                 conn_err(tconn, "incompatible after-sb-1pri settings\n");
3043                 goto disconnect_rcu_unlock;
3044         }
3045
3046         if (cmp_after_sb(p_after_sb_2p, nc->after_sb_2p)) {
3047                 conn_err(tconn, "incompatible after-sb-2pri settings\n");
3048                 goto disconnect_rcu_unlock;
3049         }
3050
3051         if (p_want_lose && nc->want_lose) {
3052                 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
3053                 goto disconnect_rcu_unlock;
3054         }
3055
3056         if (p_two_primaries != nc->two_primaries) {
3057                 conn_err(tconn, "incompatible setting of the two-primaries options\n");
3058                 goto disconnect_rcu_unlock;
3059         }
3060
3061         if (tconn->agreed_pro_version >= 87) {
3062                 if (strcmp(p_integrity_alg, nc->integrity_alg)) {
3063                         conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
3064                         goto disconnect;
3065                 }
3066         }
3067
3068         rcu_read_unlock();
3069
3070         if (tconn->agreed_pro_version >= 87) {
3071                 conn_info(tconn, "data-integrity-alg: %s\n",
3072                           nc->integrity_alg[0] ? nc->integrity_alg : (unsigned char *)"<not-used>");
3073         }
3074
3075         return 0;
3076
3077 disconnect_rcu_unlock:
3078         rcu_read_unlock();
3079 disconnect:
3080         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3081         return -EIO;
3082 }
3083
3084 /* helper function
3085  * input: alg name, feature name
3086  * return: NULL (alg name was "")
3087  *         ERR_PTR(error) if something goes wrong
3088  *         or the crypto hash ptr, if it worked out ok. */
3089 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3090                 const char *alg, const char *name)
3091 {
3092         struct crypto_hash *tfm;
3093
3094         if (!alg[0])
3095                 return NULL;
3096
3097         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3098         if (IS_ERR(tfm)) {
3099                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3100                         alg, name, PTR_ERR(tfm));
3101                 return tfm;
3102         }
3103         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
3104                 crypto_free_hash(tfm);
3105                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
3106                 return ERR_PTR(-EINVAL);
3107         }
3108         return tfm;
3109 }
3110
3111 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3112 {
3113         void *buffer = tconn->data.rbuf;
3114         int size = pi->size;
3115
3116         while (size) {
3117                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3118                 s = drbd_recv(tconn, buffer, s);
3119                 if (s <= 0) {
3120                         if (s < 0)
3121                                 return s;
3122                         break;
3123                 }
3124                 size -= s;
3125         }
3126         if (size)
3127                 return -EIO;
3128         return 0;
3129 }
3130
3131 /*
3132  * config_unknown_volume  -  device configuration command for unknown volume
3133  *
3134  * When a device is added to an existing connection, the node on which the
3135  * device is added first will send configuration commands to its peer but the
3136  * peer will not know about the device yet.  It will warn and ignore these
3137  * commands.  Once the device is added on the second node, the second node will
3138  * send the same device configuration commands, but in the other direction.
3139  *
3140  * (We can also end up here if drbd is misconfigured.)
3141  */
3142 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3143 {
3144         conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3145                   pi->vnr, cmdname(pi->cmd));
3146         return ignore_remaining_packet(tconn, pi);
3147 }
3148
3149 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3150 {
3151         struct drbd_conf *mdev;
3152         struct p_rs_param_95 *p;
3153         unsigned int header_size, data_size, exp_max_sz;
3154         struct crypto_hash *verify_tfm = NULL;
3155         struct crypto_hash *csums_tfm = NULL;
3156         struct net_conf *old_conf, *new_conf = NULL;
3157         const int apv = tconn->agreed_pro_version;
3158         int *rs_plan_s = NULL;
3159         int fifo_size = 0;
3160         int err;
3161
3162         mdev = vnr_to_mdev(tconn, pi->vnr);
3163         if (!mdev)
3164                 return config_unknown_volume(tconn, pi);
3165
3166         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3167                     : apv == 88 ? sizeof(struct p_rs_param)
3168                                         + SHARED_SECRET_MAX
3169                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3170                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3171
3172         if (pi->size > exp_max_sz) {
3173                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3174                     pi->size, exp_max_sz);
3175                 return -EIO;
3176         }
3177
3178         if (apv <= 88) {
3179                 header_size = sizeof(struct p_rs_param);
3180                 data_size = pi->size - header_size;
3181         } else if (apv <= 94) {
3182                 header_size = sizeof(struct p_rs_param_89);
3183                 data_size = pi->size - header_size;
3184                 D_ASSERT(data_size == 0);
3185         } else {
3186                 header_size = sizeof(struct p_rs_param_95);
3187                 data_size = pi->size - header_size;
3188                 D_ASSERT(data_size == 0);
3189         }
3190
3191         /* initialize verify_alg and csums_alg */
3192         p = pi->data;
3193         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3194
3195         err = drbd_recv_all(mdev->tconn, p, header_size);
3196         if (err)
3197                 return err;
3198
3199         if (get_ldev(mdev)) {
3200                 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3201                 put_ldev(mdev);
3202         }
3203
3204         if (apv >= 88) {
3205                 if (apv == 88) {
3206                         if (data_size > SHARED_SECRET_MAX) {
3207                                 dev_err(DEV, "verify-alg too long, "
3208                                     "peer wants %u, accepting only %u byte\n",
3209                                                 data_size, SHARED_SECRET_MAX);
3210                                 return -EIO;
3211                         }
3212
3213                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3214                         if (err)
3215                                 return err;
3216
3217                         /* we expect NUL terminated string */
3218                         /* but just in case someone tries to be evil */
3219                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3220                         p->verify_alg[data_size-1] = 0;
3221
3222                 } else /* apv >= 89 */ {
3223                         /* we still expect NUL terminated strings */
3224                         /* but just in case someone tries to be evil */
3225                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3226                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3227                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3228                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3229                 }
3230
3231                 mutex_lock(&mdev->tconn->net_conf_update);
3232                 old_conf = mdev->tconn->net_conf;
3233
3234                 if (strcmp(old_conf->verify_alg, p->verify_alg)) {
3235                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3236                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3237                                     old_conf->verify_alg, p->verify_alg);
3238                                 goto disconnect;
3239                         }
3240                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3241                                         p->verify_alg, "verify-alg");
3242                         if (IS_ERR(verify_tfm)) {
3243                                 verify_tfm = NULL;
3244                                 goto disconnect;
3245                         }
3246                 }
3247
3248                 if (apv >= 89 && strcmp(old_conf->csums_alg, p->csums_alg)) {
3249                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3250                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3251                                     old_conf->csums_alg, p->csums_alg);
3252                                 goto disconnect;
3253                         }
3254                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3255                                         p->csums_alg, "csums-alg");
3256                         if (IS_ERR(csums_tfm)) {
3257                                 csums_tfm = NULL;
3258                                 goto disconnect;
3259                         }
3260                 }
3261
3262                 if (apv > 94 && get_ldev(mdev)) {
3263                         mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3264                         mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3265                         mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3266                         mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3267                         mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3268
3269                         fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3270                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3271                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3272                                 if (!rs_plan_s) {
3273                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3274                                         put_ldev(mdev);
3275                                         goto disconnect;
3276                                 }
3277                         }
3278                         put_ldev(mdev);
3279                 }
3280
3281                 if (verify_tfm || csums_tfm) {
3282                         new_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3283                         if (!new_conf) {
3284                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3285                                 goto disconnect;
3286                         }
3287
3288                         *new_conf = *old_conf;
3289
3290                         if (verify_tfm) {
3291                                 strcpy(new_conf->verify_alg, p->verify_alg);
3292                                 new_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3293                                 crypto_free_hash(mdev->tconn->verify_tfm);
3294                                 mdev->tconn->verify_tfm = verify_tfm;
3295                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3296                         }
3297                         if (csums_tfm) {
3298                                 strcpy(new_conf->csums_alg, p->csums_alg);
3299                                 new_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3300                                 crypto_free_hash(mdev->tconn->csums_tfm);
3301                                 mdev->tconn->csums_tfm = csums_tfm;
3302                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3303                         }
3304                         rcu_assign_pointer(tconn->net_conf, new_conf);
3305                 }
3306                 mutex_unlock(&mdev->tconn->net_conf_update);
3307                 if (new_conf) {
3308                         synchronize_rcu();
3309                         kfree(old_conf);
3310                 }
3311
3312                 spin_lock(&mdev->peer_seq_lock);
3313                 if (fifo_size != mdev->rs_plan_s.size) {
3314                         kfree(mdev->rs_plan_s.values);
3315                         mdev->rs_plan_s.values = rs_plan_s;
3316                         mdev->rs_plan_s.size   = fifo_size;
3317                         mdev->rs_planed = 0;
3318                 }
3319                 spin_unlock(&mdev->peer_seq_lock);
3320         }
3321         return 0;
3322
3323 disconnect:
3324         mutex_unlock(&mdev->tconn->net_conf_update);
3325         /* just for completeness: actually not needed,
3326          * as this is not reached if csums_tfm was ok. */
3327         crypto_free_hash(csums_tfm);
3328         /* but free the verify_tfm again, if csums_tfm did not work out */
3329         crypto_free_hash(verify_tfm);
3330         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3331         return -EIO;
3332 }
3333
3334 /* warn if the arguments differ by more than 12.5% */
3335 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3336         const char *s, sector_t a, sector_t b)
3337 {
3338         sector_t d;
3339         if (a == 0 || b == 0)
3340                 return;
3341         d = (a > b) ? (a - b) : (b - a);
3342         if (d > (a>>3) || d > (b>>3))
3343                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3344                      (unsigned long long)a, (unsigned long long)b);
3345 }
3346
3347 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3348 {
3349         struct drbd_conf *mdev;
3350         struct p_sizes *p = pi->data;
3351         enum determine_dev_size dd = unchanged;
3352         sector_t p_size, p_usize, my_usize;
3353         int ldsc = 0; /* local disk size changed */
3354         enum dds_flags ddsf;
3355
3356         mdev = vnr_to_mdev(tconn, pi->vnr);
3357         if (!mdev)
3358                 return config_unknown_volume(tconn, pi);
3359
3360         p_size = be64_to_cpu(p->d_size);
3361         p_usize = be64_to_cpu(p->u_size);
3362
3363         /* just store the peer's disk size for now.
3364          * we still need to figure out whether we accept that. */
3365         mdev->p_size = p_size;
3366
3367         if (get_ldev(mdev)) {
3368                 warn_if_differ_considerably(mdev, "lower level device sizes",
3369                            p_size, drbd_get_max_capacity(mdev->ldev));
3370                 warn_if_differ_considerably(mdev, "user requested size",
3371                                             p_usize, mdev->ldev->dc.disk_size);
3372
3373                 /* if this is the first connect, or an otherwise expected
3374                  * param exchange, choose the minimum */
3375                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3376                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3377                                              p_usize);
3378
3379                 my_usize = mdev->ldev->dc.disk_size;
3380
3381                 if (mdev->ldev->dc.disk_size != p_usize) {
3382                         mdev->ldev->dc.disk_size = p_usize;
3383                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3384                              (unsigned long)mdev->ldev->dc.disk_size);
3385                 }
3386
3387                 /* Never shrink a device with usable data during connect.
3388                    But allow online shrinking if we are connected. */
3389                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3390                    drbd_get_capacity(mdev->this_bdev) &&
3391                    mdev->state.disk >= D_OUTDATED &&
3392                    mdev->state.conn < C_CONNECTED) {
3393                         dev_err(DEV, "The peer's disk size is too small!\n");
3394                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3395                         mdev->ldev->dc.disk_size = my_usize;
3396                         put_ldev(mdev);
3397                         return -EIO;
3398                 }
3399                 put_ldev(mdev);
3400         }
3401
3402         ddsf = be16_to_cpu(p->dds_flags);
3403         if (get_ldev(mdev)) {
3404                 dd = drbd_determine_dev_size(mdev, ddsf);
3405                 put_ldev(mdev);
3406                 if (dd == dev_size_error)
3407                         return -EIO;
3408                 drbd_md_sync(mdev);
3409         } else {
3410                 /* I am diskless, need to accept the peer's size. */
3411                 drbd_set_my_capacity(mdev, p_size);
3412         }
3413
3414         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3415         drbd_reconsider_max_bio_size(mdev);
3416
3417         if (get_ldev(mdev)) {
3418                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3419                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3420                         ldsc = 1;
3421                 }
3422
3423                 put_ldev(mdev);
3424         }
3425
3426         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3427                 if (be64_to_cpu(p->c_size) !=
3428                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3429                         /* we have different sizes, probably peer
3430                          * needs to know my new size... */
3431                         drbd_send_sizes(mdev, 0, ddsf);
3432                 }
3433                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3434                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3435                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3436                             mdev->state.disk >= D_INCONSISTENT) {
3437                                 if (ddsf & DDSF_NO_RESYNC)
3438                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3439                                 else
3440                                         resync_after_online_grow(mdev);
3441                         } else
3442                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3443                 }
3444         }
3445
3446         return 0;
3447 }
3448
3449 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3450 {
3451         struct drbd_conf *mdev;
3452         struct p_uuids *p = pi->data;
3453         u64 *p_uuid;
3454         int i, updated_uuids = 0;
3455
3456         mdev = vnr_to_mdev(tconn, pi->vnr);
3457         if (!mdev)
3458                 return config_unknown_volume(tconn, pi);
3459
3460         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3461
3462         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3463                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3464
3465         kfree(mdev->p_uuid);
3466         mdev->p_uuid = p_uuid;
3467
3468         if (mdev->state.conn < C_CONNECTED &&
3469             mdev->state.disk < D_INCONSISTENT &&
3470             mdev->state.role == R_PRIMARY &&
3471             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3472                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3473                     (unsigned long long)mdev->ed_uuid);
3474                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3475                 return -EIO;
3476         }
3477
3478         if (get_ldev(mdev)) {
3479                 int skip_initial_sync =
3480                         mdev->state.conn == C_CONNECTED &&
3481                         mdev->tconn->agreed_pro_version >= 90 &&
3482                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3483                         (p_uuid[UI_FLAGS] & 8);
3484                 if (skip_initial_sync) {
3485                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3486                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3487                                         "clear_n_write from receive_uuids",
3488                                         BM_LOCKED_TEST_ALLOWED);
3489                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3490                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3491                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3492                                         CS_VERBOSE, NULL);
3493                         drbd_md_sync(mdev);
3494                         updated_uuids = 1;
3495                 }
3496                 put_ldev(mdev);
3497         } else if (mdev->state.disk < D_INCONSISTENT &&
3498                    mdev->state.role == R_PRIMARY) {
3499                 /* I am a diskless primary, the peer just created a new current UUID
3500                    for me. */
3501                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3502         }
3503
3504         /* Before we test for the disk state, we should wait until an eventually
3505            ongoing cluster wide state change is finished. That is important if
3506            we are primary and are detaching from our disk. We need to see the
3507            new disk state... */
3508         mutex_lock(mdev->state_mutex);
3509         mutex_unlock(mdev->state_mutex);
3510         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3511                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3512
3513         if (updated_uuids)
3514                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3515
3516         return 0;
3517 }
3518
3519 /**
3520  * convert_state() - Converts the peer's view of the cluster state to our point of view
3521  * @ps:         The state as seen by the peer.
3522  */
3523 static union drbd_state convert_state(union drbd_state ps)
3524 {
3525         union drbd_state ms;
3526
3527         static enum drbd_conns c_tab[] = {
3528                 [C_CONNECTED] = C_CONNECTED,
3529
3530                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3531                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3532                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3533                 [C_VERIFY_S]       = C_VERIFY_T,
3534                 [C_MASK]   = C_MASK,
3535         };
3536
3537         ms.i = ps.i;
3538
3539         ms.conn = c_tab[ps.conn];
3540         ms.peer = ps.role;
3541         ms.role = ps.peer;
3542         ms.pdsk = ps.disk;
3543         ms.disk = ps.pdsk;
3544         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3545
3546         return ms;
3547 }
3548
3549 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3550 {
3551         struct drbd_conf *mdev;
3552         struct p_req_state *p = pi->data;
3553         union drbd_state mask, val;
3554         enum drbd_state_rv rv;
3555
3556         mdev = vnr_to_mdev(tconn, pi->vnr);
3557         if (!mdev)
3558                 return -EIO;
3559
3560         mask.i = be32_to_cpu(p->mask);
3561         val.i = be32_to_cpu(p->val);
3562
3563         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3564             mutex_is_locked(mdev->state_mutex)) {
3565                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3566                 return 0;
3567         }
3568
3569         mask = convert_state(mask);
3570         val = convert_state(val);
3571
3572         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3573         drbd_send_sr_reply(mdev, rv);
3574
3575         drbd_md_sync(mdev);
3576
3577         return 0;
3578 }
3579
3580 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3581 {
3582         struct p_req_state *p = pi->data;
3583         union drbd_state mask, val;
3584         enum drbd_state_rv rv;
3585
3586         mask.i = be32_to_cpu(p->mask);
3587         val.i = be32_to_cpu(p->val);
3588
3589         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3590             mutex_is_locked(&tconn->cstate_mutex)) {
3591                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3592                 return 0;
3593         }
3594
3595         mask = convert_state(mask);
3596         val = convert_state(val);
3597
3598         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3599         conn_send_sr_reply(tconn, rv);
3600
3601         return 0;
3602 }
3603
3604 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3605 {
3606         struct drbd_conf *mdev;
3607         struct p_state *p = pi->data;
3608         union drbd_state os, ns, peer_state;
3609         enum drbd_disk_state real_peer_disk;
3610         enum chg_state_flags cs_flags;
3611         int rv;
3612
3613         mdev = vnr_to_mdev(tconn, pi->vnr);
3614         if (!mdev)
3615                 return config_unknown_volume(tconn, pi);
3616
3617         peer_state.i = be32_to_cpu(p->state);
3618
3619         real_peer_disk = peer_state.disk;
3620         if (peer_state.disk == D_NEGOTIATING) {
3621                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3622                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3623         }
3624
3625         spin_lock_irq(&mdev->tconn->req_lock);
3626  retry:
3627         os = ns = drbd_read_state(mdev);
3628         spin_unlock_irq(&mdev->tconn->req_lock);
3629
3630         /* peer says his disk is uptodate, while we think it is inconsistent,
3631          * and this happens while we think we have a sync going on. */
3632         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3633             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3634                 /* If we are (becoming) SyncSource, but peer is still in sync
3635                  * preparation, ignore its uptodate-ness to avoid flapping, it
3636                  * will change to inconsistent once the peer reaches active
3637                  * syncing states.
3638                  * It may have changed syncer-paused flags, however, so we
3639                  * cannot ignore this completely. */
3640                 if (peer_state.conn > C_CONNECTED &&
3641                     peer_state.conn < C_SYNC_SOURCE)
3642                         real_peer_disk = D_INCONSISTENT;
3643
3644                 /* if peer_state changes to connected at the same time,
3645                  * it explicitly notifies us that it finished resync.
3646                  * Maybe we should finish it up, too? */
3647                 else if (os.conn >= C_SYNC_SOURCE &&
3648                          peer_state.conn == C_CONNECTED) {
3649                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3650                                 drbd_resync_finished(mdev);
3651                         return 0;
3652                 }
3653         }
3654
3655         /* peer says his disk is inconsistent, while we think it is uptodate,
3656          * and this happens while the peer still thinks we have a sync going on,
3657          * but we think we are already done with the sync.
3658          * We ignore this to avoid flapping pdsk.
3659          * This should not happen, if the peer is a recent version of drbd. */
3660         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3661             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3662                 real_peer_disk = D_UP_TO_DATE;
3663
3664         if (ns.conn == C_WF_REPORT_PARAMS)
3665                 ns.conn = C_CONNECTED;
3666
3667         if (peer_state.conn == C_AHEAD)
3668                 ns.conn = C_BEHIND;
3669
3670         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3671             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3672                 int cr; /* consider resync */
3673
3674                 /* if we established a new connection */
3675                 cr  = (os.conn < C_CONNECTED);
3676                 /* if we had an established connection
3677                  * and one of the nodes newly attaches a disk */
3678                 cr |= (os.conn == C_CONNECTED &&
3679                        (peer_state.disk == D_NEGOTIATING ||
3680                         os.disk == D_NEGOTIATING));
3681                 /* if we have both been inconsistent, and the peer has been
3682                  * forced to be UpToDate with --overwrite-data */
3683                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3684                 /* if we had been plain connected, and the admin requested to
3685                  * start a sync by "invalidate" or "invalidate-remote" */
3686                 cr |= (os.conn == C_CONNECTED &&
3687                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3688                                  peer_state.conn <= C_WF_BITMAP_T));
3689
3690                 if (cr)
3691                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3692
3693                 put_ldev(mdev);
3694                 if (ns.conn == C_MASK) {
3695                         ns.conn = C_CONNECTED;
3696                         if (mdev->state.disk == D_NEGOTIATING) {
3697                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3698                         } else if (peer_state.disk == D_NEGOTIATING) {
3699                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3700                                 peer_state.disk = D_DISKLESS;
3701                                 real_peer_disk = D_DISKLESS;
3702                         } else {
3703                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3704                                         return -EIO;
3705                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3706                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3707                                 return -EIO;
3708                         }
3709                 }
3710         }
3711
3712         spin_lock_irq(&mdev->tconn->req_lock);
3713         if (os.i != drbd_read_state(mdev).i)
3714                 goto retry;
3715         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3716         ns.peer = peer_state.role;
3717         ns.pdsk = real_peer_disk;
3718         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3719         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3720                 ns.disk = mdev->new_state_tmp.disk;
3721         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3722         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3723             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3724                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3725                    for temporal network outages! */
3726                 spin_unlock_irq(&mdev->tconn->req_lock);
3727                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3728                 tl_clear(mdev->tconn);
3729                 drbd_uuid_new_current(mdev);
3730                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3731                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3732                 return -EIO;
3733         }
3734         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3735         ns = drbd_read_state(mdev);
3736         spin_unlock_irq(&mdev->tconn->req_lock);
3737
3738         if (rv < SS_SUCCESS) {
3739                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3740                 return -EIO;
3741         }
3742
3743         if (os.conn > C_WF_REPORT_PARAMS) {
3744                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3745                     peer_state.disk != D_NEGOTIATING ) {
3746                         /* we want resync, peer has not yet decided to sync... */
3747                         /* Nowadays only used when forcing a node into primary role and
3748                            setting its disk to UpToDate with that */
3749                         drbd_send_uuids(mdev);
3750                         drbd_send_state(mdev);
3751                 }
3752         }
3753
3754         mutex_lock(&mdev->tconn->net_conf_update);
3755         mdev->tconn->net_conf->want_lose = 0; /* without copy; single bit op is atomic */
3756         mutex_unlock(&mdev->tconn->net_conf_update);
3757
3758         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3759
3760         return 0;
3761 }
3762
3763 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3764 {
3765         struct drbd_conf *mdev;
3766         struct p_rs_uuid *p = pi->data;
3767
3768         mdev = vnr_to_mdev(tconn, pi->vnr);
3769         if (!mdev)
3770                 return -EIO;
3771
3772         wait_event(mdev->misc_wait,
3773                    mdev->state.conn == C_WF_SYNC_UUID ||
3774                    mdev->state.conn == C_BEHIND ||
3775                    mdev->state.conn < C_CONNECTED ||
3776                    mdev->state.disk < D_NEGOTIATING);
3777
3778         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3779
3780         /* Here the _drbd_uuid_ functions are right, current should
3781            _not_ be rotated into the history */
3782         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3783                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3784                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3785
3786                 drbd_print_uuids(mdev, "updated sync uuid");
3787                 drbd_start_resync(mdev, C_SYNC_TARGET);
3788
3789                 put_ldev(mdev);
3790         } else
3791                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3792
3793         return 0;
3794 }
3795
3796 /**
3797  * receive_bitmap_plain
3798  *
3799  * Return 0 when done, 1 when another iteration is needed, and a negative error
3800  * code upon failure.
3801  */
3802 static int
3803 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3804                      unsigned long *p, struct bm_xfer_ctx *c)
3805 {
3806         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3807                                  drbd_header_size(mdev->tconn);
3808         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3809                                        c->bm_words - c->word_offset);
3810         unsigned int want = num_words * sizeof(*p);
3811         int err;
3812
3813         if (want != size) {
3814                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3815                 return -EIO;
3816         }
3817         if (want == 0)
3818                 return 0;
3819         err = drbd_recv_all(mdev->tconn, p, want);
3820         if (err)
3821                 return err;
3822
3823         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3824
3825         c->word_offset += num_words;
3826         c->bit_offset = c->word_offset * BITS_PER_LONG;
3827         if (c->bit_offset > c->bm_bits)
3828                 c->bit_offset = c->bm_bits;
3829
3830         return 1;
3831 }
3832
3833 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3834 {
3835         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3836 }
3837
3838 static int dcbp_get_start(struct p_compressed_bm *p)
3839 {
3840         return (p->encoding & 0x80) != 0;
3841 }
3842
3843 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3844 {
3845         return (p->encoding >> 4) & 0x7;
3846 }
3847
3848 /**
3849  * recv_bm_rle_bits
3850  *
3851  * Return 0 when done, 1 when another iteration is needed, and a negative error
3852  * code upon failure.
3853  */
3854 static int
3855 recv_bm_rle_bits(struct drbd_conf *mdev,
3856                 struct p_compressed_bm *p,
3857                  struct bm_xfer_ctx *c,
3858                  unsigned int len)
3859 {
3860         struct bitstream bs;
3861         u64 look_ahead;
3862         u64 rl;
3863         u64 tmp;
3864         unsigned long s = c->bit_offset;
3865         unsigned long e;
3866         int toggle = dcbp_get_start(p);
3867         int have;
3868         int bits;
3869
3870         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3871
3872         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3873         if (bits < 0)
3874                 return -EIO;
3875
3876         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3877                 bits = vli_decode_bits(&rl, look_ahead);
3878                 if (bits <= 0)
3879                         return -EIO;
3880
3881                 if (toggle) {
3882                         e = s + rl -1;
3883                         if (e >= c->bm_bits) {
3884                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3885                                 return -EIO;
3886                         }
3887                         _drbd_bm_set_bits(mdev, s, e);
3888                 }
3889
3890                 if (have < bits) {
3891                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3892                                 have, bits, look_ahead,
3893                                 (unsigned int)(bs.cur.b - p->code),
3894                                 (unsigned int)bs.buf_len);
3895                         return -EIO;
3896                 }
3897                 look_ahead >>= bits;
3898                 have -= bits;
3899
3900                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3901                 if (bits < 0)
3902                         return -EIO;
3903                 look_ahead |= tmp << have;
3904                 have += bits;
3905         }
3906
3907         c->bit_offset = s;
3908         bm_xfer_ctx_bit_to_word_offset(c);
3909
3910         return (s != c->bm_bits);
3911 }
3912
3913 /**
3914  * decode_bitmap_c
3915  *
3916  * Return 0 when done, 1 when another iteration is needed, and a negative error
3917  * code upon failure.
3918  */
3919 static int
3920 decode_bitmap_c(struct drbd_conf *mdev,
3921                 struct p_compressed_bm *p,
3922                 struct bm_xfer_ctx *c,
3923                 unsigned int len)
3924 {
3925         if (dcbp_get_code(p) == RLE_VLI_Bits)
3926                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
3927
3928         /* other variants had been implemented for evaluation,
3929          * but have been dropped as this one turned out to be "best"
3930          * during all our tests. */
3931
3932         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3933         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3934         return -EIO;
3935 }
3936
3937 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3938                 const char *direction, struct bm_xfer_ctx *c)
3939 {
3940         /* what would it take to transfer it "plaintext" */
3941         unsigned int header_size = drbd_header_size(mdev->tconn);
3942         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3943         unsigned int plain =
3944                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3945                 c->bm_words * sizeof(unsigned long);
3946         unsigned int total = c->bytes[0] + c->bytes[1];
3947         unsigned int r;
3948
3949         /* total can not be zero. but just in case: */
3950         if (total == 0)
3951                 return;
3952
3953         /* don't report if not compressed */
3954         if (total >= plain)
3955                 return;
3956
3957         /* total < plain. check for overflow, still */
3958         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3959                                     : (1000 * total / plain);
3960
3961         if (r > 1000)
3962                 r = 1000;
3963
3964         r = 1000 - r;
3965         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3966              "total %u; compression: %u.%u%%\n",
3967                         direction,
3968                         c->bytes[1], c->packets[1],
3969                         c->bytes[0], c->packets[0],
3970                         total, r/10, r % 10);
3971 }
3972
3973 /* Since we are processing the bitfield from lower addresses to higher,
3974    it does not matter if the process it in 32 bit chunks or 64 bit
3975    chunks as long as it is little endian. (Understand it as byte stream,
3976    beginning with the lowest byte...) If we would use big endian
3977    we would need to process it from the highest address to the lowest,
3978    in order to be agnostic to the 32 vs 64 bits issue.
3979
3980    returns 0 on failure, 1 if we successfully received it. */
3981 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3982 {
3983         struct drbd_conf *mdev;
3984         struct bm_xfer_ctx c;
3985         int err;
3986
3987         mdev = vnr_to_mdev(tconn, pi->vnr);
3988         if (!mdev)
3989                 return -EIO;
3990
3991         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3992         /* you are supposed to send additional out-of-sync information
3993          * if you actually set bits during this phase */
3994
3995         c = (struct bm_xfer_ctx) {
3996                 .bm_bits = drbd_bm_bits(mdev),
3997                 .bm_words = drbd_bm_words(mdev),
3998         };
3999
4000         for(;;) {
4001                 if (pi->cmd == P_BITMAP)
4002                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4003                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4004                         /* MAYBE: sanity check that we speak proto >= 90,
4005                          * and the feature is enabled! */
4006                         struct p_compressed_bm *p = pi->data;
4007
4008                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4009                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4010                                 err = -EIO;
4011                                 goto out;
4012                         }
4013                         if (pi->size <= sizeof(*p)) {
4014                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4015                                 err = -EIO;
4016                                 goto out;
4017                         }
4018                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4019                         if (err)
4020                                goto out;
4021                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4022                 } else {
4023                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4024                         err = -EIO;
4025                         goto out;
4026                 }
4027
4028                 c.packets[pi->cmd == P_BITMAP]++;
4029                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4030
4031                 if (err <= 0) {
4032                         if (err < 0)
4033                                 goto out;
4034                         break;
4035                 }
4036                 err = drbd_recv_header(mdev->tconn, pi);
4037                 if (err)
4038                         goto out;
4039         }
4040
4041         INFO_bm_xfer_stats(mdev, "receive", &c);
4042
4043         if (mdev->state.conn == C_WF_BITMAP_T) {
4044                 enum drbd_state_rv rv;
4045
4046                 err = drbd_send_bitmap(mdev);
4047                 if (err)
4048                         goto out;
4049                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4050                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4051                 D_ASSERT(rv == SS_SUCCESS);
4052         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4053                 /* admin may have requested C_DISCONNECTING,
4054                  * other threads may have noticed network errors */
4055                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4056                     drbd_conn_str(mdev->state.conn));
4057         }
4058         err = 0;
4059
4060  out:
4061         drbd_bm_unlock(mdev);
4062         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4063                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4064         return err;
4065 }
4066
4067 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4068 {
4069         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4070                  pi->cmd, pi->size);
4071
4072         return ignore_remaining_packet(tconn, pi);
4073 }
4074
4075 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4076 {
4077         /* Make sure we've acked all the TCP data associated
4078          * with the data requests being unplugged */
4079         drbd_tcp_quickack(tconn->data.socket);
4080
4081         return 0;
4082 }
4083
4084 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4085 {
4086         struct drbd_conf *mdev;
4087         struct p_block_desc *p = pi->data;
4088
4089         mdev = vnr_to_mdev(tconn, pi->vnr);
4090         if (!mdev)
4091                 return -EIO;
4092
4093         switch (mdev->state.conn) {
4094         case C_WF_SYNC_UUID:
4095         case C_WF_BITMAP_T:
4096         case C_BEHIND:
4097                         break;
4098         default:
4099                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4100                                 drbd_conn_str(mdev->state.conn));
4101         }
4102
4103         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4104
4105         return 0;
4106 }
4107
4108 struct data_cmd {
4109         int expect_payload;
4110         size_t pkt_size;
4111         int (*fn)(struct drbd_tconn *, struct packet_info *);
4112 };
4113
4114 static struct data_cmd drbd_cmd_handler[] = {
4115         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4116         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4117         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4118         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4119         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4120         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4121         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4122         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4123         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4124         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4125         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4126         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4127         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4128         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4129         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4130         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4131         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4132         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4133         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4134         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4135         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4136         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4137         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4138 };
4139
4140 static void drbdd(struct drbd_tconn *tconn)
4141 {
4142         struct packet_info pi;
4143         size_t shs; /* sub header size */
4144         int err;
4145
4146         while (get_t_state(&tconn->receiver) == RUNNING) {
4147                 struct data_cmd *cmd;
4148
4149                 drbd_thread_current_set_cpu(&tconn->receiver);
4150                 if (drbd_recv_header(tconn, &pi))
4151                         goto err_out;
4152
4153                 cmd = &drbd_cmd_handler[pi.cmd];
4154                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4155                         conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4156                         goto err_out;
4157                 }
4158
4159                 shs = cmd->pkt_size;
4160                 if (pi.size > shs && !cmd->expect_payload) {
4161                         conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4162                         goto err_out;
4163                 }
4164
4165                 if (shs) {
4166                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4167                         if (err)
4168                                 goto err_out;
4169                         pi.size -= shs;
4170                 }
4171
4172                 err = cmd->fn(tconn, &pi);
4173                 if (err) {
4174                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4175                                  cmdname(pi.cmd), err, pi.size);
4176                         goto err_out;
4177                 }
4178         }
4179         return;
4180
4181     err_out:
4182         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4183 }
4184
4185 void conn_flush_workqueue(struct drbd_tconn *tconn)
4186 {
4187         struct drbd_wq_barrier barr;
4188
4189         barr.w.cb = w_prev_work_done;
4190         barr.w.tconn = tconn;
4191         init_completion(&barr.done);
4192         drbd_queue_work(&tconn->data.work, &barr.w);
4193         wait_for_completion(&barr.done);
4194 }
4195
4196 static void drbd_disconnect(struct drbd_tconn *tconn)
4197 {
4198         enum drbd_conns oc;
4199         int rv = SS_UNKNOWN_ERROR;
4200
4201         if (tconn->cstate == C_STANDALONE)
4202                 return;
4203
4204         /* asender does not clean up anything. it must not interfere, either */
4205         drbd_thread_stop(&tconn->asender);
4206         drbd_free_sock(tconn);
4207
4208         down_read(&drbd_cfg_rwsem);
4209         idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4210         up_read(&drbd_cfg_rwsem);
4211         conn_info(tconn, "Connection closed\n");
4212
4213         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4214                 conn_try_outdate_peer_async(tconn);
4215
4216         spin_lock_irq(&tconn->req_lock);
4217         oc = tconn->cstate;
4218         if (oc >= C_UNCONNECTED)
4219                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4220
4221         spin_unlock_irq(&tconn->req_lock);
4222
4223         if (oc == C_DISCONNECTING) {
4224                 struct net_conf *old_conf;
4225
4226                 mutex_lock(&tconn->net_conf_update);
4227                 old_conf = tconn->net_conf;
4228                 rcu_assign_pointer(tconn->net_conf, NULL);
4229                 conn_free_crypto(tconn);
4230                 mutex_unlock(&tconn->net_conf_update);
4231
4232                 synchronize_rcu();
4233                 kfree(old_conf);
4234
4235                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4236         }
4237 }
4238
4239 static int drbd_disconnected(int vnr, void *p, void *data)
4240 {
4241         struct drbd_conf *mdev = (struct drbd_conf *)p;
4242         enum drbd_fencing_p fp;
4243         unsigned int i;
4244
4245         /* wait for current activity to cease. */
4246         spin_lock_irq(&mdev->tconn->req_lock);
4247         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4248         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4249         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4250         spin_unlock_irq(&mdev->tconn->req_lock);
4251
4252         /* We do not have data structures that would allow us to
4253          * get the rs_pending_cnt down to 0 again.
4254          *  * On C_SYNC_TARGET we do not have any data structures describing
4255          *    the pending RSDataRequest's we have sent.
4256          *  * On C_SYNC_SOURCE there is no data structure that tracks
4257          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4258          *  And no, it is not the sum of the reference counts in the
4259          *  resync_LRU. The resync_LRU tracks the whole operation including
4260          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4261          *  on the fly. */
4262         drbd_rs_cancel_all(mdev);
4263         mdev->rs_total = 0;
4264         mdev->rs_failed = 0;
4265         atomic_set(&mdev->rs_pending_cnt, 0);
4266         wake_up(&mdev->misc_wait);
4267
4268         del_timer(&mdev->request_timer);
4269
4270         del_timer_sync(&mdev->resync_timer);
4271         resync_timer_fn((unsigned long)mdev);
4272
4273         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4274          * w_make_resync_request etc. which may still be on the worker queue
4275          * to be "canceled" */
4276         drbd_flush_workqueue(mdev);
4277
4278         drbd_finish_peer_reqs(mdev);
4279
4280         kfree(mdev->p_uuid);
4281         mdev->p_uuid = NULL;
4282
4283         if (!drbd_suspended(mdev))
4284                 tl_clear(mdev->tconn);
4285
4286         drbd_md_sync(mdev);
4287
4288         fp = FP_DONT_CARE;
4289         if (get_ldev(mdev)) {
4290                 fp = mdev->ldev->dc.fencing;
4291                 put_ldev(mdev);
4292         }
4293
4294         /* serialize with bitmap writeout triggered by the state change,
4295          * if any. */
4296         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4297
4298         /* tcp_close and release of sendpage pages can be deferred.  I don't
4299          * want to use SO_LINGER, because apparently it can be deferred for
4300          * more than 20 seconds (longest time I checked).
4301          *
4302          * Actually we don't care for exactly when the network stack does its
4303          * put_page(), but release our reference on these pages right here.
4304          */
4305         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4306         if (i)
4307                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4308         i = atomic_read(&mdev->pp_in_use_by_net);
4309         if (i)
4310                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4311         i = atomic_read(&mdev->pp_in_use);
4312         if (i)
4313                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4314
4315         D_ASSERT(list_empty(&mdev->read_ee));
4316         D_ASSERT(list_empty(&mdev->active_ee));
4317         D_ASSERT(list_empty(&mdev->sync_ee));
4318         D_ASSERT(list_empty(&mdev->done_ee));
4319
4320         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4321         atomic_set(&mdev->current_epoch->epoch_size, 0);
4322         D_ASSERT(list_empty(&mdev->current_epoch->list));
4323
4324         return 0;
4325 }
4326
4327 /*
4328  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4329  * we can agree on is stored in agreed_pro_version.
4330  *
4331  * feature flags and the reserved array should be enough room for future
4332  * enhancements of the handshake protocol, and possible plugins...
4333  *
4334  * for now, they are expected to be zero, but ignored.
4335  */
4336 static int drbd_send_features(struct drbd_tconn *tconn)
4337 {
4338         struct drbd_socket *sock;
4339         struct p_connection_features *p;
4340
4341         sock = &tconn->data;
4342         p = conn_prepare_command(tconn, sock);
4343         if (!p)
4344                 return -EIO;
4345         memset(p, 0, sizeof(*p));
4346         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4347         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4348         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4349 }
4350
4351 /*
4352  * return values:
4353  *   1 yes, we have a valid connection
4354  *   0 oops, did not work out, please try again
4355  *  -1 peer talks different language,
4356  *     no point in trying again, please go standalone.
4357  */
4358 static int drbd_do_features(struct drbd_tconn *tconn)
4359 {
4360         /* ASSERT current == tconn->receiver ... */
4361         struct p_connection_features *p;
4362         const int expect = sizeof(struct p_connection_features);
4363         struct packet_info pi;
4364         int err;
4365
4366         err = drbd_send_features(tconn);
4367         if (err)
4368                 return 0;
4369
4370         err = drbd_recv_header(tconn, &pi);
4371         if (err)
4372                 return 0;
4373
4374         if (pi.cmd != P_CONNECTION_FEATURES) {
4375                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4376                      cmdname(pi.cmd), pi.cmd);
4377                 return -1;
4378         }
4379
4380         if (pi.size != expect) {
4381                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4382                      expect, pi.size);
4383                 return -1;
4384         }
4385
4386         p = pi.data;
4387         err = drbd_recv_all_warn(tconn, p, expect);
4388         if (err)
4389                 return 0;
4390
4391         p->protocol_min = be32_to_cpu(p->protocol_min);
4392         p->protocol_max = be32_to_cpu(p->protocol_max);
4393         if (p->protocol_max == 0)
4394                 p->protocol_max = p->protocol_min;
4395
4396         if (PRO_VERSION_MAX < p->protocol_min ||
4397             PRO_VERSION_MIN > p->protocol_max)
4398                 goto incompat;
4399
4400         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4401
4402         conn_info(tconn, "Handshake successful: "
4403              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4404
4405         return 1;
4406
4407  incompat:
4408         conn_err(tconn, "incompatible DRBD dialects: "
4409             "I support %d-%d, peer supports %d-%d\n",
4410             PRO_VERSION_MIN, PRO_VERSION_MAX,
4411             p->protocol_min, p->protocol_max);
4412         return -1;
4413 }
4414
4415 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4416 static int drbd_do_auth(struct drbd_tconn *tconn)
4417 {
4418         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4419         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4420         return -1;
4421 }
4422 #else
4423 #define CHALLENGE_LEN 64
4424
4425 /* Return value:
4426         1 - auth succeeded,
4427         0 - failed, try again (network error),
4428         -1 - auth failed, don't try again.
4429 */
4430
4431 static int drbd_do_auth(struct drbd_tconn *tconn)
4432 {
4433         struct drbd_socket *sock;
4434         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4435         struct scatterlist sg;
4436         char *response = NULL;
4437         char *right_response = NULL;
4438         char *peers_ch = NULL;
4439         unsigned int key_len;
4440         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4441         unsigned int resp_size;
4442         struct hash_desc desc;
4443         struct packet_info pi;
4444         struct net_conf *nc;
4445         int err, rv;
4446
4447         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4448
4449         rcu_read_lock();
4450         nc = rcu_dereference(tconn->net_conf);
4451         key_len = strlen(nc->shared_secret);
4452         memcpy(secret, nc->shared_secret, key_len);
4453         rcu_read_unlock();
4454
4455         desc.tfm = tconn->cram_hmac_tfm;
4456         desc.flags = 0;
4457
4458         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4459         if (rv) {
4460                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4461                 rv = -1;
4462                 goto fail;
4463         }
4464
4465         get_random_bytes(my_challenge, CHALLENGE_LEN);
4466
4467         sock = &tconn->data;
4468         if (!conn_prepare_command(tconn, sock)) {
4469                 rv = 0;
4470                 goto fail;
4471         }
4472         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4473                                 my_challenge, CHALLENGE_LEN);
4474         if (!rv)
4475                 goto fail;
4476
4477         err = drbd_recv_header(tconn, &pi);
4478         if (err) {
4479                 rv = 0;
4480                 goto fail;
4481         }
4482
4483         if (pi.cmd != P_AUTH_CHALLENGE) {
4484                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4485                     cmdname(pi.cmd), pi.cmd);
4486                 rv = 0;
4487                 goto fail;
4488         }
4489
4490         if (pi.size > CHALLENGE_LEN * 2) {
4491                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4492                 rv = -1;
4493                 goto fail;
4494         }
4495
4496         peers_ch = kmalloc(pi.size, GFP_NOIO);
4497         if (peers_ch == NULL) {
4498                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4499                 rv = -1;
4500                 goto fail;
4501         }
4502
4503         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4504         if (err) {
4505                 rv = 0;
4506                 goto fail;
4507         }
4508
4509         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4510         response = kmalloc(resp_size, GFP_NOIO);
4511         if (response == NULL) {
4512                 conn_err(tconn, "kmalloc of response failed\n");
4513                 rv = -1;
4514                 goto fail;
4515         }
4516
4517         sg_init_table(&sg, 1);
4518         sg_set_buf(&sg, peers_ch, pi.size);
4519
4520         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4521         if (rv) {
4522                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4523                 rv = -1;
4524                 goto fail;
4525         }
4526
4527         if (!conn_prepare_command(tconn, sock)) {
4528                 rv = 0;
4529                 goto fail;
4530         }
4531         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4532                                 response, resp_size);
4533         if (!rv)
4534                 goto fail;
4535
4536         err = drbd_recv_header(tconn, &pi);
4537         if (err) {
4538                 rv = 0;
4539                 goto fail;
4540         }
4541
4542         if (pi.cmd != P_AUTH_RESPONSE) {
4543                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4544                         cmdname(pi.cmd), pi.cmd);
4545                 rv = 0;
4546                 goto fail;
4547         }
4548
4549         if (pi.size != resp_size) {
4550                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4551                 rv = 0;
4552                 goto fail;
4553         }
4554
4555         err = drbd_recv_all_warn(tconn, response , resp_size);
4556         if (err) {
4557                 rv = 0;
4558                 goto fail;
4559         }
4560
4561         right_response = kmalloc(resp_size, GFP_NOIO);
4562         if (right_response == NULL) {
4563                 conn_err(tconn, "kmalloc of right_response failed\n");
4564                 rv = -1;
4565                 goto fail;
4566         }
4567
4568         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4569
4570         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4571         if (rv) {
4572                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4573                 rv = -1;
4574                 goto fail;
4575         }
4576
4577         rv = !memcmp(response, right_response, resp_size);
4578
4579         if (rv)
4580                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4581                      resp_size);
4582         else
4583                 rv = -1;
4584
4585  fail:
4586         kfree(peers_ch);
4587         kfree(response);
4588         kfree(right_response);
4589
4590         return rv;
4591 }
4592 #endif
4593
4594 int drbdd_init(struct drbd_thread *thi)
4595 {
4596         struct drbd_tconn *tconn = thi->tconn;
4597         int h;
4598
4599         conn_info(tconn, "receiver (re)started\n");
4600
4601         do {
4602                 h = drbd_connect(tconn);
4603                 if (h == 0) {
4604                         drbd_disconnect(tconn);
4605                         schedule_timeout_interruptible(HZ);
4606                 }
4607                 if (h == -1) {
4608                         conn_warn(tconn, "Discarding network configuration.\n");
4609                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4610                 }
4611         } while (h == 0);
4612
4613         if (h > 0)
4614                 drbdd(tconn);
4615
4616         drbd_disconnect(tconn);
4617
4618         conn_info(tconn, "receiver terminated\n");
4619         return 0;
4620 }
4621
4622 /* ********* acknowledge sender ******** */
4623
4624 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4625 {
4626         struct p_req_state_reply *p = pi->data;
4627         int retcode = be32_to_cpu(p->retcode);
4628
4629         if (retcode >= SS_SUCCESS) {
4630                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4631         } else {
4632                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4633                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4634                          drbd_set_st_err_str(retcode), retcode);
4635         }
4636         wake_up(&tconn->ping_wait);
4637
4638         return 0;
4639 }
4640
4641 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4642 {
4643         struct drbd_conf *mdev;
4644         struct p_req_state_reply *p = pi->data;
4645         int retcode = be32_to_cpu(p->retcode);
4646
4647         mdev = vnr_to_mdev(tconn, pi->vnr);
4648         if (!mdev)
4649                 return -EIO;
4650
4651         if (retcode >= SS_SUCCESS) {
4652                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4653         } else {
4654                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4655                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4656                         drbd_set_st_err_str(retcode), retcode);
4657         }
4658         wake_up(&mdev->state_wait);
4659
4660         return 0;
4661 }
4662
4663 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4664 {
4665         return drbd_send_ping_ack(tconn);
4666
4667 }
4668
4669 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4670 {
4671         /* restore idle timeout */
4672         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4673         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4674                 wake_up(&tconn->ping_wait);
4675
4676         return 0;
4677 }
4678
4679 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4680 {
4681         struct drbd_conf *mdev;
4682         struct p_block_ack *p = pi->data;
4683         sector_t sector = be64_to_cpu(p->sector);
4684         int blksize = be32_to_cpu(p->blksize);
4685
4686         mdev = vnr_to_mdev(tconn, pi->vnr);
4687         if (!mdev)
4688                 return -EIO;
4689
4690         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4691
4692         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4693
4694         if (get_ldev(mdev)) {
4695                 drbd_rs_complete_io(mdev, sector);
4696                 drbd_set_in_sync(mdev, sector, blksize);
4697                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4698                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4699                 put_ldev(mdev);
4700         }
4701         dec_rs_pending(mdev);
4702         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4703
4704         return 0;
4705 }
4706
4707 static int
4708 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4709                               struct rb_root *root, const char *func,
4710                               enum drbd_req_event what, bool missing_ok)
4711 {
4712         struct drbd_request *req;
4713         struct bio_and_error m;
4714
4715         spin_lock_irq(&mdev->tconn->req_lock);
4716         req = find_request(mdev, root, id, sector, missing_ok, func);
4717         if (unlikely(!req)) {
4718                 spin_unlock_irq(&mdev->tconn->req_lock);
4719                 return -EIO;
4720         }
4721         __req_mod(req, what, &m);
4722         spin_unlock_irq(&mdev->tconn->req_lock);
4723
4724         if (m.bio)
4725                 complete_master_bio(mdev, &m);
4726         return 0;
4727 }
4728
4729 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4730 {
4731         struct drbd_conf *mdev;
4732         struct p_block_ack *p = pi->data;
4733         sector_t sector = be64_to_cpu(p->sector);
4734         int blksize = be32_to_cpu(p->blksize);
4735         enum drbd_req_event what;
4736
4737         mdev = vnr_to_mdev(tconn, pi->vnr);
4738         if (!mdev)
4739                 return -EIO;
4740
4741         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4742
4743         if (p->block_id == ID_SYNCER) {
4744                 drbd_set_in_sync(mdev, sector, blksize);
4745                 dec_rs_pending(mdev);
4746                 return 0;
4747         }
4748         switch (pi->cmd) {
4749         case P_RS_WRITE_ACK:
4750                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4751                 break;
4752         case P_WRITE_ACK:
4753                 what = WRITE_ACKED_BY_PEER;
4754                 break;
4755         case P_RECV_ACK:
4756                 what = RECV_ACKED_BY_PEER;
4757                 break;
4758         case P_DISCARD_WRITE:
4759                 what = DISCARD_WRITE;
4760                 break;
4761         case P_RETRY_WRITE:
4762                 what = POSTPONE_WRITE;
4763                 break;
4764         default:
4765                 BUG();
4766         }
4767
4768         return validate_req_change_req_state(mdev, p->block_id, sector,
4769                                              &mdev->write_requests, __func__,
4770                                              what, false);
4771 }
4772
4773 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4774 {
4775         struct drbd_conf *mdev;
4776         struct p_block_ack *p = pi->data;
4777         sector_t sector = be64_to_cpu(p->sector);
4778         int size = be32_to_cpu(p->blksize);
4779         int err;
4780
4781         mdev = vnr_to_mdev(tconn, pi->vnr);
4782         if (!mdev)
4783                 return -EIO;
4784
4785         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4786
4787         if (p->block_id == ID_SYNCER) {
4788                 dec_rs_pending(mdev);
4789                 drbd_rs_failed_io(mdev, sector, size);
4790                 return 0;
4791         }
4792
4793         err = validate_req_change_req_state(mdev, p->block_id, sector,
4794                                             &mdev->write_requests, __func__,
4795                                             NEG_ACKED, true);
4796         if (err) {
4797                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4798                    The master bio might already be completed, therefore the
4799                    request is no longer in the collision hash. */
4800                 /* In Protocol B we might already have got a P_RECV_ACK
4801                    but then get a P_NEG_ACK afterwards. */
4802                 drbd_set_out_of_sync(mdev, sector, size);
4803         }
4804         return 0;
4805 }
4806
4807 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4808 {
4809         struct drbd_conf *mdev;
4810         struct p_block_ack *p = pi->data;
4811         sector_t sector = be64_to_cpu(p->sector);
4812
4813         mdev = vnr_to_mdev(tconn, pi->vnr);
4814         if (!mdev)
4815                 return -EIO;
4816
4817         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4818
4819         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4820             (unsigned long long)sector, be32_to_cpu(p->blksize));
4821
4822         return validate_req_change_req_state(mdev, p->block_id, sector,
4823                                              &mdev->read_requests, __func__,
4824                                              NEG_ACKED, false);
4825 }
4826
4827 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4828 {
4829         struct drbd_conf *mdev;
4830         sector_t sector;
4831         int size;
4832         struct p_block_ack *p = pi->data;
4833
4834         mdev = vnr_to_mdev(tconn, pi->vnr);
4835         if (!mdev)
4836                 return -EIO;
4837
4838         sector = be64_to_cpu(p->sector);
4839         size = be32_to_cpu(p->blksize);
4840
4841         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4842
4843         dec_rs_pending(mdev);
4844
4845         if (get_ldev_if_state(mdev, D_FAILED)) {
4846                 drbd_rs_complete_io(mdev, sector);
4847                 switch (pi->cmd) {
4848                 case P_NEG_RS_DREPLY:
4849                         drbd_rs_failed_io(mdev, sector, size);
4850                 case P_RS_CANCEL:
4851                         break;
4852                 default:
4853                         BUG();
4854                 }
4855                 put_ldev(mdev);
4856         }
4857
4858         return 0;
4859 }
4860
4861 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4862 {
4863         struct drbd_conf *mdev;
4864         struct p_barrier_ack *p = pi->data;
4865
4866         mdev = vnr_to_mdev(tconn, pi->vnr);
4867         if (!mdev)
4868                 return -EIO;
4869
4870         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4871
4872         if (mdev->state.conn == C_AHEAD &&
4873             atomic_read(&mdev->ap_in_flight) == 0 &&
4874             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4875                 mdev->start_resync_timer.expires = jiffies + HZ;
4876                 add_timer(&mdev->start_resync_timer);
4877         }
4878
4879         return 0;
4880 }
4881
4882 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4883 {
4884         struct drbd_conf *mdev;
4885         struct p_block_ack *p = pi->data;
4886         struct drbd_work *w;
4887         sector_t sector;
4888         int size;
4889
4890         mdev = vnr_to_mdev(tconn, pi->vnr);
4891         if (!mdev)
4892                 return -EIO;
4893
4894         sector = be64_to_cpu(p->sector);
4895         size = be32_to_cpu(p->blksize);
4896
4897         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4898
4899         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4900                 drbd_ov_out_of_sync_found(mdev, sector, size);
4901         else
4902                 ov_out_of_sync_print(mdev);
4903
4904         if (!get_ldev(mdev))
4905                 return 0;
4906
4907         drbd_rs_complete_io(mdev, sector);
4908         dec_rs_pending(mdev);
4909
4910         --mdev->ov_left;
4911
4912         /* let's advance progress step marks only for every other megabyte */
4913         if ((mdev->ov_left & 0x200) == 0x200)
4914                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4915
4916         if (mdev->ov_left == 0) {
4917                 w = kmalloc(sizeof(*w), GFP_NOIO);
4918                 if (w) {
4919                         w->cb = w_ov_finished;
4920                         w->mdev = mdev;
4921                         drbd_queue_work_front(&mdev->tconn->data.work, w);
4922                 } else {
4923                         dev_err(DEV, "kmalloc(w) failed.");
4924                         ov_out_of_sync_print(mdev);
4925                         drbd_resync_finished(mdev);
4926                 }
4927         }
4928         put_ldev(mdev);
4929         return 0;
4930 }
4931
4932 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4933 {
4934         return 0;
4935 }
4936
4937 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
4938 {
4939         struct drbd_conf *mdev;
4940         int i, not_empty = 0;
4941
4942         do {
4943                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4944                 flush_signals(current);
4945                 down_read(&drbd_cfg_rwsem);
4946                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4947                         if (drbd_finish_peer_reqs(mdev)) {
4948                                 up_read(&drbd_cfg_rwsem);
4949                                 return 1; /* error */
4950                         }
4951                 }
4952                 up_read(&drbd_cfg_rwsem);
4953                 set_bit(SIGNAL_ASENDER, &tconn->flags);
4954
4955                 spin_lock_irq(&tconn->req_lock);
4956                 rcu_read_lock();
4957                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4958                         not_empty = !list_empty(&mdev->done_ee);
4959                         if (not_empty)
4960                                 break;
4961                 }
4962                 rcu_read_unlock();
4963                 spin_unlock_irq(&tconn->req_lock);
4964         } while (not_empty);
4965
4966         return 0;
4967 }
4968
4969 struct asender_cmd {
4970         size_t pkt_size;
4971         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
4972 };
4973
4974 static struct asender_cmd asender_tbl[] = {
4975         [P_PING]            = { 0, got_Ping },
4976         [P_PING_ACK]        = { 0, got_PingAck },
4977         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4978         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4979         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4980         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
4981         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4982         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4983         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
4984         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4985         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4986         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4987         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4988         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4989         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
4990         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4991         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
4992 };
4993
4994 int drbd_asender(struct drbd_thread *thi)
4995 {
4996         struct drbd_tconn *tconn = thi->tconn;
4997         struct asender_cmd *cmd = NULL;
4998         struct packet_info pi;
4999         int rv;
5000         void *buf    = tconn->meta.rbuf;
5001         int received = 0;
5002         unsigned int header_size = drbd_header_size(tconn);
5003         int expect   = header_size;
5004         bool ping_timeout_active = false;
5005         struct net_conf *nc;
5006         int ping_timeo, no_cork, ping_int;
5007
5008         current->policy = SCHED_RR;  /* Make this a realtime task! */
5009         current->rt_priority = 2;    /* more important than all other tasks */
5010
5011         while (get_t_state(thi) == RUNNING) {
5012                 drbd_thread_current_set_cpu(thi);
5013
5014                 rcu_read_lock();
5015                 nc = rcu_dereference(tconn->net_conf);
5016                 ping_timeo = nc->ping_timeo;
5017                 no_cork = nc->no_cork;
5018                 ping_int = nc->ping_int;
5019                 rcu_read_unlock();
5020
5021                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5022                         if (drbd_send_ping(tconn)) {
5023                                 conn_err(tconn, "drbd_send_ping has failed\n");
5024                                 goto reconnect;
5025                         }
5026                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5027                         ping_timeout_active = true;
5028                 }
5029
5030                 /* TODO: conditionally cork; it may hurt latency if we cork without
5031                    much to send */
5032                 if (!no_cork)
5033                         drbd_tcp_cork(tconn->meta.socket);
5034                 if (tconn_finish_peer_reqs(tconn)) {
5035                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5036                         goto reconnect;
5037                 }
5038                 /* but unconditionally uncork unless disabled */
5039                 if (!no_cork)
5040                         drbd_tcp_uncork(tconn->meta.socket);
5041
5042                 /* short circuit, recv_msg would return EINTR anyways. */
5043                 if (signal_pending(current))
5044                         continue;
5045
5046                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5047                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5048
5049                 flush_signals(current);
5050
5051                 /* Note:
5052                  * -EINTR        (on meta) we got a signal
5053                  * -EAGAIN       (on meta) rcvtimeo expired
5054                  * -ECONNRESET   other side closed the connection
5055                  * -ERESTARTSYS  (on data) we got a signal
5056                  * rv <  0       other than above: unexpected error!
5057                  * rv == expected: full header or command
5058                  * rv <  expected: "woken" by signal during receive
5059                  * rv == 0       : "connection shut down by peer"
5060                  */
5061                 if (likely(rv > 0)) {
5062                         received += rv;
5063                         buf      += rv;
5064                 } else if (rv == 0) {
5065                         conn_err(tconn, "meta connection shut down by peer.\n");
5066                         goto reconnect;
5067                 } else if (rv == -EAGAIN) {
5068                         /* If the data socket received something meanwhile,
5069                          * that is good enough: peer is still alive. */
5070                         if (time_after(tconn->last_received,
5071                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5072                                 continue;
5073                         if (ping_timeout_active) {
5074                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5075                                 goto reconnect;
5076                         }
5077                         set_bit(SEND_PING, &tconn->flags);
5078                         continue;
5079                 } else if (rv == -EINTR) {
5080                         continue;
5081                 } else {
5082                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5083                         goto reconnect;
5084                 }
5085
5086                 if (received == expect && cmd == NULL) {
5087                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5088                                 goto reconnect;
5089                         cmd = &asender_tbl[pi.cmd];
5090                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5091                                 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
5092                                         pi.cmd, pi.size);
5093                                 goto disconnect;
5094                         }
5095                         expect = header_size + cmd->pkt_size;
5096                         if (pi.size != expect - header_size) {
5097                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5098                                         pi.cmd, pi.size);
5099                                 goto reconnect;
5100                         }
5101                 }
5102                 if (received == expect) {
5103                         bool err;
5104
5105                         err = cmd->fn(tconn, &pi);
5106                         if (err) {
5107                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5108                                 goto reconnect;
5109                         }
5110
5111                         tconn->last_received = jiffies;
5112
5113                         if (cmd == &asender_tbl[P_PING_ACK]) {
5114                                 /* restore idle timeout */
5115                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5116                                 ping_timeout_active = false;
5117                         }
5118
5119                         buf      = tconn->meta.rbuf;
5120                         received = 0;
5121                         expect   = header_size;
5122                         cmd      = NULL;
5123                 }
5124         }
5125
5126         if (0) {
5127 reconnect:
5128                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5129         }
5130         if (0) {
5131 disconnect:
5132                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5133         }
5134         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5135
5136         conn_info(tconn, "asender terminated\n");
5137
5138         return 0;
5139 }