]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
drbd: validate_req_change_req_state(): Return 0 upon success and an error code otherwise
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(int vnr, void *p, void *data);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
154 {
155         struct page *page = NULL;
156         struct page *tmp = NULL;
157         int i = 0;
158
159         /* Yes, testing drbd_pp_vacant outside the lock is racy.
160          * So what. It saves a spin_lock. */
161         if (drbd_pp_vacant >= number) {
162                 spin_lock(&drbd_pp_lock);
163                 page = page_chain_del(&drbd_pp_pool, number);
164                 if (page)
165                         drbd_pp_vacant -= number;
166                 spin_unlock(&drbd_pp_lock);
167                 if (page)
168                         return page;
169         }
170
171         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
172          * "criss-cross" setup, that might cause write-out on some other DRBD,
173          * which in turn might block on the other node at this very place.  */
174         for (i = 0; i < number; i++) {
175                 tmp = alloc_page(GFP_TRY);
176                 if (!tmp)
177                         break;
178                 set_page_private(tmp, (unsigned long)page);
179                 page = tmp;
180         }
181
182         if (i == number)
183                 return page;
184
185         /* Not enough pages immediately available this time.
186          * No need to jump around here, drbd_pp_alloc will retry this
187          * function "soon". */
188         if (page) {
189                 tmp = page_chain_tail(page, NULL);
190                 spin_lock(&drbd_pp_lock);
191                 page_chain_add(&drbd_pp_pool, page, tmp);
192                 drbd_pp_vacant += i;
193                 spin_unlock(&drbd_pp_lock);
194         }
195         return NULL;
196 }
197
198 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
199 {
200         struct drbd_peer_request *peer_req;
201         struct list_head *le, *tle;
202
203         /* The EEs are always appended to the end of the list. Since
204            they are sent in order over the wire, they have to finish
205            in order. As soon as we see the first not finished we can
206            stop to examine the list... */
207
208         list_for_each_safe(le, tle, &mdev->net_ee) {
209                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
210                 if (drbd_ee_has_active_page(peer_req))
211                         break;
212                 list_move(le, to_be_freed);
213         }
214 }
215
216 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
217 {
218         LIST_HEAD(reclaimed);
219         struct drbd_peer_request *peer_req, *t;
220
221         spin_lock_irq(&mdev->tconn->req_lock);
222         reclaim_net_ee(mdev, &reclaimed);
223         spin_unlock_irq(&mdev->tconn->req_lock);
224
225         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
226                 drbd_free_net_ee(mdev, peer_req);
227 }
228
229 /**
230  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
231  * @mdev:       DRBD device.
232  * @number:     number of pages requested
233  * @retry:      whether to retry, if not enough pages are available right now
234  *
235  * Tries to allocate number pages, first from our own page pool, then from
236  * the kernel, unless this allocation would exceed the max_buffers setting.
237  * Possibly retry until DRBD frees sufficient pages somewhere else.
238  *
239  * Returns a page chain linked via page->private.
240  */
241 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
242 {
243         struct page *page = NULL;
244         DEFINE_WAIT(wait);
245
246         /* Yes, we may run up to @number over max_buffers. If we
247          * follow it strictly, the admin will get it wrong anyways. */
248         if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
249                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250
251         while (page == NULL) {
252                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
253
254                 drbd_kick_lo_and_reclaim_net(mdev);
255
256                 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
257                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
258                         if (page)
259                                 break;
260                 }
261
262                 if (!retry)
263                         break;
264
265                 if (signal_pending(current)) {
266                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
267                         break;
268                 }
269
270                 schedule();
271         }
272         finish_wait(&drbd_pp_wait, &wait);
273
274         if (page)
275                 atomic_add(number, &mdev->pp_in_use);
276         return page;
277 }
278
279 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
280  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
281  * Either links the page chain back to the global pool,
282  * or returns all pages to the system. */
283 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
284 {
285         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
286         int i;
287
288         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
289                 i = page_chain_free(page);
290         else {
291                 struct page *tmp;
292                 tmp = page_chain_tail(page, &i);
293                 spin_lock(&drbd_pp_lock);
294                 page_chain_add(&drbd_pp_pool, page, tmp);
295                 drbd_pp_vacant += i;
296                 spin_unlock(&drbd_pp_lock);
297         }
298         i = atomic_sub_return(i, a);
299         if (i < 0)
300                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
301                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
302         wake_up(&drbd_pp_wait);
303 }
304
305 /*
306 You need to hold the req_lock:
307  _drbd_wait_ee_list_empty()
308
309 You must not have the req_lock:
310  drbd_free_ee()
311  drbd_alloc_ee()
312  drbd_init_ee()
313  drbd_release_ee()
314  drbd_ee_fix_bhs()
315  drbd_process_done_ee()
316  drbd_clear_done_ee()
317  drbd_wait_ee_list_empty()
318 */
319
320 struct drbd_peer_request *
321 drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
322               unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
323 {
324         struct drbd_peer_request *peer_req;
325         struct page *page;
326         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
327
328         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
329                 return NULL;
330
331         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
332         if (!peer_req) {
333                 if (!(gfp_mask & __GFP_NOWARN))
334                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
335                 return NULL;
336         }
337
338         page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
339         if (!page)
340                 goto fail;
341
342         drbd_clear_interval(&peer_req->i);
343         peer_req->i.size = data_size;
344         peer_req->i.sector = sector;
345         peer_req->i.local = false;
346         peer_req->i.waiting = false;
347
348         peer_req->epoch = NULL;
349         peer_req->w.mdev = mdev;
350         peer_req->pages = page;
351         atomic_set(&peer_req->pending_bios, 0);
352         peer_req->flags = 0;
353         /*
354          * The block_id is opaque to the receiver.  It is not endianness
355          * converted, and sent back to the sender unchanged.
356          */
357         peer_req->block_id = id;
358
359         return peer_req;
360
361  fail:
362         mempool_free(peer_req, drbd_ee_mempool);
363         return NULL;
364 }
365
366 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
367                        int is_net)
368 {
369         if (peer_req->flags & EE_HAS_DIGEST)
370                 kfree(peer_req->digest);
371         drbd_pp_free(mdev, peer_req->pages, is_net);
372         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
373         D_ASSERT(drbd_interval_empty(&peer_req->i));
374         mempool_free(peer_req, drbd_ee_mempool);
375 }
376
377 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
378 {
379         LIST_HEAD(work_list);
380         struct drbd_peer_request *peer_req, *t;
381         int count = 0;
382         int is_net = list == &mdev->net_ee;
383
384         spin_lock_irq(&mdev->tconn->req_lock);
385         list_splice_init(list, &work_list);
386         spin_unlock_irq(&mdev->tconn->req_lock);
387
388         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
389                 drbd_free_some_ee(mdev, peer_req, is_net);
390                 count++;
391         }
392         return count;
393 }
394
395
396 /* See also comments in _req_mod(,BARRIER_ACKED)
397  * and receive_Barrier.
398  *
399  * Move entries from net_ee to done_ee, if ready.
400  * Grab done_ee, call all callbacks, free the entries.
401  * The callbacks typically send out ACKs.
402  */
403 static int drbd_process_done_ee(struct drbd_conf *mdev)
404 {
405         LIST_HEAD(work_list);
406         LIST_HEAD(reclaimed);
407         struct drbd_peer_request *peer_req, *t;
408         int err = 0;
409
410         spin_lock_irq(&mdev->tconn->req_lock);
411         reclaim_net_ee(mdev, &reclaimed);
412         list_splice_init(&mdev->done_ee, &work_list);
413         spin_unlock_irq(&mdev->tconn->req_lock);
414
415         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
416                 drbd_free_net_ee(mdev, peer_req);
417
418         /* possible callbacks here:
419          * e_end_block, and e_end_resync_block, e_send_discard_write.
420          * all ignore the last argument.
421          */
422         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
423                 int err2;
424
425                 /* list_del not necessary, next/prev members not touched */
426                 err2 = peer_req->w.cb(&peer_req->w, !!err);
427                 if (!err)
428                         err = err2;
429                 drbd_free_ee(mdev, peer_req);
430         }
431         wake_up(&mdev->ee_wait);
432
433         return err;
434 }
435
436 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
437 {
438         DEFINE_WAIT(wait);
439
440         /* avoids spin_lock/unlock
441          * and calling prepare_to_wait in the fast path */
442         while (!list_empty(head)) {
443                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
444                 spin_unlock_irq(&mdev->tconn->req_lock);
445                 io_schedule();
446                 finish_wait(&mdev->ee_wait, &wait);
447                 spin_lock_irq(&mdev->tconn->req_lock);
448         }
449 }
450
451 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
452 {
453         spin_lock_irq(&mdev->tconn->req_lock);
454         _drbd_wait_ee_list_empty(mdev, head);
455         spin_unlock_irq(&mdev->tconn->req_lock);
456 }
457
458 /* see also kernel_accept; which is only present since 2.6.18.
459  * also we want to log which part of it failed, exactly */
460 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
461 {
462         struct sock *sk = sock->sk;
463         int err = 0;
464
465         *what = "listen";
466         err = sock->ops->listen(sock, 5);
467         if (err < 0)
468                 goto out;
469
470         *what = "sock_create_lite";
471         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
472                                newsock);
473         if (err < 0)
474                 goto out;
475
476         *what = "accept";
477         err = sock->ops->accept(sock, *newsock, 0);
478         if (err < 0) {
479                 sock_release(*newsock);
480                 *newsock = NULL;
481                 goto out;
482         }
483         (*newsock)->ops  = sock->ops;
484
485 out:
486         return err;
487 }
488
489 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
490 {
491         mm_segment_t oldfs;
492         struct kvec iov = {
493                 .iov_base = buf,
494                 .iov_len = size,
495         };
496         struct msghdr msg = {
497                 .msg_iovlen = 1,
498                 .msg_iov = (struct iovec *)&iov,
499                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
500         };
501         int rv;
502
503         oldfs = get_fs();
504         set_fs(KERNEL_DS);
505         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
506         set_fs(oldfs);
507
508         return rv;
509 }
510
511 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
512 {
513         mm_segment_t oldfs;
514         struct kvec iov = {
515                 .iov_base = buf,
516                 .iov_len = size,
517         };
518         struct msghdr msg = {
519                 .msg_iovlen = 1,
520                 .msg_iov = (struct iovec *)&iov,
521                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
522         };
523         int rv;
524
525         oldfs = get_fs();
526         set_fs(KERNEL_DS);
527
528         for (;;) {
529                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
530                 if (rv == size)
531                         break;
532
533                 /* Note:
534                  * ECONNRESET   other side closed the connection
535                  * ERESTARTSYS  (on  sock) we got a signal
536                  */
537
538                 if (rv < 0) {
539                         if (rv == -ECONNRESET)
540                                 conn_info(tconn, "sock was reset by peer\n");
541                         else if (rv != -ERESTARTSYS)
542                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
543                         break;
544                 } else if (rv == 0) {
545                         conn_info(tconn, "sock was shut down by peer\n");
546                         break;
547                 } else  {
548                         /* signal came in, or peer/link went down,
549                          * after we read a partial message
550                          */
551                         /* D_ASSERT(signal_pending(current)); */
552                         break;
553                 }
554         };
555
556         set_fs(oldfs);
557
558         if (rv != size)
559                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
560
561         return rv;
562 }
563
564 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
565 {
566         int err;
567
568         err = drbd_recv(tconn, buf, size);
569         if (err != size) {
570                 if (err >= 0)
571                         err = -EIO;
572         } else
573                 err = 0;
574         return err;
575 }
576
577 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
578 {
579         int err;
580
581         err = drbd_recv_all(tconn, buf, size);
582         if (err && !signal_pending(current))
583                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
584         return err;
585 }
586
587 /* quoting tcp(7):
588  *   On individual connections, the socket buffer size must be set prior to the
589  *   listen(2) or connect(2) calls in order to have it take effect.
590  * This is our wrapper to do so.
591  */
592 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
593                 unsigned int rcv)
594 {
595         /* open coded SO_SNDBUF, SO_RCVBUF */
596         if (snd) {
597                 sock->sk->sk_sndbuf = snd;
598                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
599         }
600         if (rcv) {
601                 sock->sk->sk_rcvbuf = rcv;
602                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
603         }
604 }
605
606 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
607 {
608         const char *what;
609         struct socket *sock;
610         struct sockaddr_in6 src_in6;
611         int err;
612         int disconnect_on_error = 1;
613
614         if (!get_net_conf(tconn))
615                 return NULL;
616
617         what = "sock_create_kern";
618         err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
619                 SOCK_STREAM, IPPROTO_TCP, &sock);
620         if (err < 0) {
621                 sock = NULL;
622                 goto out;
623         }
624
625         sock->sk->sk_rcvtimeo =
626         sock->sk->sk_sndtimeo =  tconn->net_conf->try_connect_int*HZ;
627         drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
628                         tconn->net_conf->rcvbuf_size);
629
630        /* explicitly bind to the configured IP as source IP
631         *  for the outgoing connections.
632         *  This is needed for multihomed hosts and to be
633         *  able to use lo: interfaces for drbd.
634         * Make sure to use 0 as port number, so linux selects
635         *  a free one dynamically.
636         */
637         memcpy(&src_in6, tconn->net_conf->my_addr,
638                min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
639         if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
640                 src_in6.sin6_port = 0;
641         else
642                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
643
644         what = "bind before connect";
645         err = sock->ops->bind(sock,
646                               (struct sockaddr *) &src_in6,
647                               tconn->net_conf->my_addr_len);
648         if (err < 0)
649                 goto out;
650
651         /* connect may fail, peer not yet available.
652          * stay C_WF_CONNECTION, don't go Disconnecting! */
653         disconnect_on_error = 0;
654         what = "connect";
655         err = sock->ops->connect(sock,
656                                  (struct sockaddr *)tconn->net_conf->peer_addr,
657                                  tconn->net_conf->peer_addr_len, 0);
658
659 out:
660         if (err < 0) {
661                 if (sock) {
662                         sock_release(sock);
663                         sock = NULL;
664                 }
665                 switch (-err) {
666                         /* timeout, busy, signal pending */
667                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
668                 case EINTR: case ERESTARTSYS:
669                         /* peer not (yet) available, network problem */
670                 case ECONNREFUSED: case ENETUNREACH:
671                 case EHOSTDOWN:    case EHOSTUNREACH:
672                         disconnect_on_error = 0;
673                         break;
674                 default:
675                         conn_err(tconn, "%s failed, err = %d\n", what, err);
676                 }
677                 if (disconnect_on_error)
678                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
679         }
680         put_net_conf(tconn);
681         return sock;
682 }
683
684 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
685 {
686         int timeo, err;
687         struct socket *s_estab = NULL, *s_listen;
688         const char *what;
689
690         if (!get_net_conf(tconn))
691                 return NULL;
692
693         what = "sock_create_kern";
694         err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
695                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
696         if (err) {
697                 s_listen = NULL;
698                 goto out;
699         }
700
701         timeo = tconn->net_conf->try_connect_int * HZ;
702         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
703
704         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
705         s_listen->sk->sk_rcvtimeo = timeo;
706         s_listen->sk->sk_sndtimeo = timeo;
707         drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
708                         tconn->net_conf->rcvbuf_size);
709
710         what = "bind before listen";
711         err = s_listen->ops->bind(s_listen,
712                               (struct sockaddr *) tconn->net_conf->my_addr,
713                               tconn->net_conf->my_addr_len);
714         if (err < 0)
715                 goto out;
716
717         err = drbd_accept(&what, s_listen, &s_estab);
718
719 out:
720         if (s_listen)
721                 sock_release(s_listen);
722         if (err < 0) {
723                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
724                         conn_err(tconn, "%s failed, err = %d\n", what, err);
725                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
726                 }
727         }
728         put_net_conf(tconn);
729
730         return s_estab;
731 }
732
733 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
734
735 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
736                              enum drbd_packet cmd)
737 {
738         if (!conn_prepare_command(tconn, sock))
739                 return -EIO;
740         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
741 }
742
743 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
744 {
745         unsigned int header_size = drbd_header_size(tconn);
746         struct packet_info pi;
747         int err;
748
749         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
750         if (err != header_size) {
751                 if (err >= 0)
752                         err = -EIO;
753                 return err;
754         }
755         err = decode_header(tconn, tconn->data.rbuf, &pi);
756         if (err)
757                 return err;
758         return pi.cmd;
759 }
760
761 /**
762  * drbd_socket_okay() - Free the socket if its connection is not okay
763  * @sock:       pointer to the pointer to the socket.
764  */
765 static int drbd_socket_okay(struct socket **sock)
766 {
767         int rr;
768         char tb[4];
769
770         if (!*sock)
771                 return false;
772
773         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
774
775         if (rr > 0 || rr == -EAGAIN) {
776                 return true;
777         } else {
778                 sock_release(*sock);
779                 *sock = NULL;
780                 return false;
781         }
782 }
783 /* Gets called if a connection is established, or if a new minor gets created
784    in a connection */
785 int drbd_connected(int vnr, void *p, void *data)
786 {
787         struct drbd_conf *mdev = (struct drbd_conf *)p;
788         int err;
789
790         atomic_set(&mdev->packet_seq, 0);
791         mdev->peer_seq = 0;
792
793         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
794                 &mdev->tconn->cstate_mutex :
795                 &mdev->own_state_mutex;
796
797         err = drbd_send_sync_param(mdev);
798         if (!err)
799                 err = drbd_send_sizes(mdev, 0, 0);
800         if (!err)
801                 err = drbd_send_uuids(mdev);
802         if (!err)
803                 err = drbd_send_state(mdev);
804         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
805         clear_bit(RESIZE_PENDING, &mdev->flags);
806         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
807         return err;
808 }
809
810 /*
811  * return values:
812  *   1 yes, we have a valid connection
813  *   0 oops, did not work out, please try again
814  *  -1 peer talks different language,
815  *     no point in trying again, please go standalone.
816  *  -2 We do not have a network config...
817  */
818 static int drbd_connect(struct drbd_tconn *tconn)
819 {
820         struct socket *sock, *msock;
821         int try, h, ok;
822
823         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
824                 return -2;
825
826         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
827
828         /* Assume that the peer only understands protocol 80 until we know better.  */
829         tconn->agreed_pro_version = 80;
830
831         do {
832                 struct socket *s;
833
834                 for (try = 0;;) {
835                         /* 3 tries, this should take less than a second! */
836                         s = drbd_try_connect(tconn);
837                         if (s || ++try >= 3)
838                                 break;
839                         /* give the other side time to call bind() & listen() */
840                         schedule_timeout_interruptible(HZ / 10);
841                 }
842
843                 if (s) {
844                         if (!tconn->data.socket) {
845                                 tconn->data.socket = s;
846                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
847                         } else if (!tconn->meta.socket) {
848                                 tconn->meta.socket = s;
849                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
850                         } else {
851                                 conn_err(tconn, "Logic error in drbd_connect()\n");
852                                 goto out_release_sockets;
853                         }
854                 }
855
856                 if (tconn->data.socket && tconn->meta.socket) {
857                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
858                         ok = drbd_socket_okay(&tconn->data.socket);
859                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
860                         if (ok)
861                                 break;
862                 }
863
864 retry:
865                 s = drbd_wait_for_connect(tconn);
866                 if (s) {
867                         try = receive_first_packet(tconn, s);
868                         drbd_socket_okay(&tconn->data.socket);
869                         drbd_socket_okay(&tconn->meta.socket);
870                         switch (try) {
871                         case P_INITIAL_DATA:
872                                 if (tconn->data.socket) {
873                                         conn_warn(tconn, "initial packet S crossed\n");
874                                         sock_release(tconn->data.socket);
875                                 }
876                                 tconn->data.socket = s;
877                                 break;
878                         case P_INITIAL_META:
879                                 if (tconn->meta.socket) {
880                                         conn_warn(tconn, "initial packet M crossed\n");
881                                         sock_release(tconn->meta.socket);
882                                 }
883                                 tconn->meta.socket = s;
884                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
885                                 break;
886                         default:
887                                 conn_warn(tconn, "Error receiving initial packet\n");
888                                 sock_release(s);
889                                 if (random32() & 1)
890                                         goto retry;
891                         }
892                 }
893
894                 if (tconn->cstate <= C_DISCONNECTING)
895                         goto out_release_sockets;
896                 if (signal_pending(current)) {
897                         flush_signals(current);
898                         smp_rmb();
899                         if (get_t_state(&tconn->receiver) == EXITING)
900                                 goto out_release_sockets;
901                 }
902
903                 if (tconn->data.socket && &tconn->meta.socket) {
904                         ok = drbd_socket_okay(&tconn->data.socket);
905                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
906                         if (ok)
907                                 break;
908                 }
909         } while (1);
910
911         sock  = tconn->data.socket;
912         msock = tconn->meta.socket;
913
914         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
915         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
916
917         sock->sk->sk_allocation = GFP_NOIO;
918         msock->sk->sk_allocation = GFP_NOIO;
919
920         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
921         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
922
923         /* NOT YET ...
924          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
925          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
926          * first set it to the P_CONNECTION_FEATURES timeout,
927          * which we set to 4x the configured ping_timeout. */
928         sock->sk->sk_sndtimeo =
929         sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
930
931         msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
932         msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
933
934         /* we don't want delays.
935          * we use TCP_CORK where appropriate, though */
936         drbd_tcp_nodelay(sock);
937         drbd_tcp_nodelay(msock);
938
939         tconn->last_received = jiffies;
940
941         h = drbd_do_features(tconn);
942         if (h <= 0)
943                 return h;
944
945         if (tconn->cram_hmac_tfm) {
946                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
947                 switch (drbd_do_auth(tconn)) {
948                 case -1:
949                         conn_err(tconn, "Authentication of peer failed\n");
950                         return -1;
951                 case 0:
952                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
953                         return 0;
954                 }
955         }
956
957         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
958                 return 0;
959
960         sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
961         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
962
963         drbd_thread_start(&tconn->asender);
964
965         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
966                 return -1;
967
968         return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
969
970 out_release_sockets:
971         if (tconn->data.socket) {
972                 sock_release(tconn->data.socket);
973                 tconn->data.socket = NULL;
974         }
975         if (tconn->meta.socket) {
976                 sock_release(tconn->meta.socket);
977                 tconn->meta.socket = NULL;
978         }
979         return -1;
980 }
981
982 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
983 {
984         unsigned int header_size = drbd_header_size(tconn);
985
986         if (header_size == sizeof(struct p_header100) &&
987             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
988                 struct p_header100 *h = header;
989                 if (h->pad != 0) {
990                         conn_err(tconn, "Header padding is not zero\n");
991                         return -EINVAL;
992                 }
993                 pi->vnr = be16_to_cpu(h->volume);
994                 pi->cmd = be16_to_cpu(h->command);
995                 pi->size = be32_to_cpu(h->length);
996         } else if (header_size == sizeof(struct p_header95) &&
997                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
998                 struct p_header95 *h = header;
999                 pi->cmd = be16_to_cpu(h->command);
1000                 pi->size = be32_to_cpu(h->length);
1001                 pi->vnr = 0;
1002         } else if (header_size == sizeof(struct p_header80) &&
1003                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1004                 struct p_header80 *h = header;
1005                 pi->cmd = be16_to_cpu(h->command);
1006                 pi->size = be16_to_cpu(h->length);
1007                 pi->vnr = 0;
1008         } else {
1009                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1010                          be32_to_cpu(*(__be32 *)header),
1011                          tconn->agreed_pro_version);
1012                 return -EINVAL;
1013         }
1014         pi->data = header + header_size;
1015         return 0;
1016 }
1017
1018 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1019 {
1020         void *buffer = tconn->data.rbuf;
1021         int err;
1022
1023         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1024         if (err)
1025                 return err;
1026
1027         err = decode_header(tconn, buffer, pi);
1028         tconn->last_received = jiffies;
1029
1030         return err;
1031 }
1032
1033 static void drbd_flush(struct drbd_conf *mdev)
1034 {
1035         int rv;
1036
1037         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1038                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1039                                         NULL);
1040                 if (rv) {
1041                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
1042                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1043                          * don't try again for ANY return value != 0
1044                          * if (rv == -EOPNOTSUPP) */
1045                         drbd_bump_write_ordering(mdev, WO_drain_io);
1046                 }
1047                 put_ldev(mdev);
1048         }
1049 }
1050
1051 /**
1052  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1053  * @mdev:       DRBD device.
1054  * @epoch:      Epoch object.
1055  * @ev:         Epoch event.
1056  */
1057 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1058                                                struct drbd_epoch *epoch,
1059                                                enum epoch_event ev)
1060 {
1061         int epoch_size;
1062         struct drbd_epoch *next_epoch;
1063         enum finish_epoch rv = FE_STILL_LIVE;
1064
1065         spin_lock(&mdev->epoch_lock);
1066         do {
1067                 next_epoch = NULL;
1068
1069                 epoch_size = atomic_read(&epoch->epoch_size);
1070
1071                 switch (ev & ~EV_CLEANUP) {
1072                 case EV_PUT:
1073                         atomic_dec(&epoch->active);
1074                         break;
1075                 case EV_GOT_BARRIER_NR:
1076                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1077                         break;
1078                 case EV_BECAME_LAST:
1079                         /* nothing to do*/
1080                         break;
1081                 }
1082
1083                 if (epoch_size != 0 &&
1084                     atomic_read(&epoch->active) == 0 &&
1085                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1086                         if (!(ev & EV_CLEANUP)) {
1087                                 spin_unlock(&mdev->epoch_lock);
1088                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1089                                 spin_lock(&mdev->epoch_lock);
1090                         }
1091                         dec_unacked(mdev);
1092
1093                         if (mdev->current_epoch != epoch) {
1094                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1095                                 list_del(&epoch->list);
1096                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1097                                 mdev->epochs--;
1098                                 kfree(epoch);
1099
1100                                 if (rv == FE_STILL_LIVE)
1101                                         rv = FE_DESTROYED;
1102                         } else {
1103                                 epoch->flags = 0;
1104                                 atomic_set(&epoch->epoch_size, 0);
1105                                 /* atomic_set(&epoch->active, 0); is already zero */
1106                                 if (rv == FE_STILL_LIVE)
1107                                         rv = FE_RECYCLED;
1108                                 wake_up(&mdev->ee_wait);
1109                         }
1110                 }
1111
1112                 if (!next_epoch)
1113                         break;
1114
1115                 epoch = next_epoch;
1116         } while (1);
1117
1118         spin_unlock(&mdev->epoch_lock);
1119
1120         return rv;
1121 }
1122
1123 /**
1124  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1125  * @mdev:       DRBD device.
1126  * @wo:         Write ordering method to try.
1127  */
1128 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1129 {
1130         enum write_ordering_e pwo;
1131         static char *write_ordering_str[] = {
1132                 [WO_none] = "none",
1133                 [WO_drain_io] = "drain",
1134                 [WO_bdev_flush] = "flush",
1135         };
1136
1137         pwo = mdev->write_ordering;
1138         wo = min(pwo, wo);
1139         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1140                 wo = WO_drain_io;
1141         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1142                 wo = WO_none;
1143         mdev->write_ordering = wo;
1144         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1145                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1146 }
1147
1148 /**
1149  * drbd_submit_peer_request()
1150  * @mdev:       DRBD device.
1151  * @peer_req:   peer request
1152  * @rw:         flag field, see bio->bi_rw
1153  *
1154  * May spread the pages to multiple bios,
1155  * depending on bio_add_page restrictions.
1156  *
1157  * Returns 0 if all bios have been submitted,
1158  * -ENOMEM if we could not allocate enough bios,
1159  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1160  *  single page to an empty bio (which should never happen and likely indicates
1161  *  that the lower level IO stack is in some way broken). This has been observed
1162  *  on certain Xen deployments.
1163  */
1164 /* TODO allocate from our own bio_set. */
1165 int drbd_submit_peer_request(struct drbd_conf *mdev,
1166                              struct drbd_peer_request *peer_req,
1167                              const unsigned rw, const int fault_type)
1168 {
1169         struct bio *bios = NULL;
1170         struct bio *bio;
1171         struct page *page = peer_req->pages;
1172         sector_t sector = peer_req->i.sector;
1173         unsigned ds = peer_req->i.size;
1174         unsigned n_bios = 0;
1175         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1176         int err = -ENOMEM;
1177
1178         /* In most cases, we will only need one bio.  But in case the lower
1179          * level restrictions happen to be different at this offset on this
1180          * side than those of the sending peer, we may need to submit the
1181          * request in more than one bio.
1182          *
1183          * Plain bio_alloc is good enough here, this is no DRBD internally
1184          * generated bio, but a bio allocated on behalf of the peer.
1185          */
1186 next_bio:
1187         bio = bio_alloc(GFP_NOIO, nr_pages);
1188         if (!bio) {
1189                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1190                 goto fail;
1191         }
1192         /* > peer_req->i.sector, unless this is the first bio */
1193         bio->bi_sector = sector;
1194         bio->bi_bdev = mdev->ldev->backing_bdev;
1195         bio->bi_rw = rw;
1196         bio->bi_private = peer_req;
1197         bio->bi_end_io = drbd_peer_request_endio;
1198
1199         bio->bi_next = bios;
1200         bios = bio;
1201         ++n_bios;
1202
1203         page_chain_for_each(page) {
1204                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1205                 if (!bio_add_page(bio, page, len, 0)) {
1206                         /* A single page must always be possible!
1207                          * But in case it fails anyways,
1208                          * we deal with it, and complain (below). */
1209                         if (bio->bi_vcnt == 0) {
1210                                 dev_err(DEV,
1211                                         "bio_add_page failed for len=%u, "
1212                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1213                                         len, (unsigned long long)bio->bi_sector);
1214                                 err = -ENOSPC;
1215                                 goto fail;
1216                         }
1217                         goto next_bio;
1218                 }
1219                 ds -= len;
1220                 sector += len >> 9;
1221                 --nr_pages;
1222         }
1223         D_ASSERT(page == NULL);
1224         D_ASSERT(ds == 0);
1225
1226         atomic_set(&peer_req->pending_bios, n_bios);
1227         do {
1228                 bio = bios;
1229                 bios = bios->bi_next;
1230                 bio->bi_next = NULL;
1231
1232                 drbd_generic_make_request(mdev, fault_type, bio);
1233         } while (bios);
1234         return 0;
1235
1236 fail:
1237         while (bios) {
1238                 bio = bios;
1239                 bios = bios->bi_next;
1240                 bio_put(bio);
1241         }
1242         return err;
1243 }
1244
1245 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1246                                              struct drbd_peer_request *peer_req)
1247 {
1248         struct drbd_interval *i = &peer_req->i;
1249
1250         drbd_remove_interval(&mdev->write_requests, i);
1251         drbd_clear_interval(i);
1252
1253         /* Wake up any processes waiting for this peer request to complete.  */
1254         if (i->waiting)
1255                 wake_up(&mdev->misc_wait);
1256 }
1257
1258 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1259 {
1260         struct drbd_conf *mdev;
1261         int rv;
1262         struct p_barrier *p = pi->data;
1263         struct drbd_epoch *epoch;
1264
1265         mdev = vnr_to_mdev(tconn, pi->vnr);
1266         if (!mdev)
1267                 return -EIO;
1268
1269         inc_unacked(mdev);
1270
1271         mdev->current_epoch->barrier_nr = p->barrier;
1272         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1273
1274         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1275          * the activity log, which means it would not be resynced in case the
1276          * R_PRIMARY crashes now.
1277          * Therefore we must send the barrier_ack after the barrier request was
1278          * completed. */
1279         switch (mdev->write_ordering) {
1280         case WO_none:
1281                 if (rv == FE_RECYCLED)
1282                         return 0;
1283
1284                 /* receiver context, in the writeout path of the other node.
1285                  * avoid potential distributed deadlock */
1286                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1287                 if (epoch)
1288                         break;
1289                 else
1290                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1291                         /* Fall through */
1292
1293         case WO_bdev_flush:
1294         case WO_drain_io:
1295                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1296                 drbd_flush(mdev);
1297
1298                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1299                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1300                         if (epoch)
1301                                 break;
1302                 }
1303
1304                 epoch = mdev->current_epoch;
1305                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1306
1307                 D_ASSERT(atomic_read(&epoch->active) == 0);
1308                 D_ASSERT(epoch->flags == 0);
1309
1310                 return 0;
1311         default:
1312                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1313                 return -EIO;
1314         }
1315
1316         epoch->flags = 0;
1317         atomic_set(&epoch->epoch_size, 0);
1318         atomic_set(&epoch->active, 0);
1319
1320         spin_lock(&mdev->epoch_lock);
1321         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1322                 list_add(&epoch->list, &mdev->current_epoch->list);
1323                 mdev->current_epoch = epoch;
1324                 mdev->epochs++;
1325         } else {
1326                 /* The current_epoch got recycled while we allocated this one... */
1327                 kfree(epoch);
1328         }
1329         spin_unlock(&mdev->epoch_lock);
1330
1331         return 0;
1332 }
1333
1334 /* used from receive_RSDataReply (recv_resync_read)
1335  * and from receive_Data */
1336 static struct drbd_peer_request *
1337 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1338               int data_size) __must_hold(local)
1339 {
1340         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1341         struct drbd_peer_request *peer_req;
1342         struct page *page;
1343         int dgs, ds, err;
1344         void *dig_in = mdev->tconn->int_dig_in;
1345         void *dig_vv = mdev->tconn->int_dig_vv;
1346         unsigned long *data;
1347
1348         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1349                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1350
1351         if (dgs) {
1352                 /*
1353                  * FIXME: Receive the incoming digest into the receive buffer
1354                  *        here, together with its struct p_data?
1355                  */
1356                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1357                 if (err)
1358                         return NULL;
1359         }
1360
1361         data_size -= dgs;
1362
1363         if (!expect(data_size != 0))
1364                 return NULL;
1365         if (!expect(IS_ALIGNED(data_size, 512)))
1366                 return NULL;
1367         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1368                 return NULL;
1369
1370         /* even though we trust out peer,
1371          * we sometimes have to double check. */
1372         if (sector + (data_size>>9) > capacity) {
1373                 dev_err(DEV, "request from peer beyond end of local disk: "
1374                         "capacity: %llus < sector: %llus + size: %u\n",
1375                         (unsigned long long)capacity,
1376                         (unsigned long long)sector, data_size);
1377                 return NULL;
1378         }
1379
1380         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1381          * "criss-cross" setup, that might cause write-out on some other DRBD,
1382          * which in turn might block on the other node at this very place.  */
1383         peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1384         if (!peer_req)
1385                 return NULL;
1386
1387         ds = data_size;
1388         page = peer_req->pages;
1389         page_chain_for_each(page) {
1390                 unsigned len = min_t(int, ds, PAGE_SIZE);
1391                 data = kmap(page);
1392                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1393                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1394                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1395                         data[0] = data[0] ^ (unsigned long)-1;
1396                 }
1397                 kunmap(page);
1398                 if (err) {
1399                         drbd_free_ee(mdev, peer_req);
1400                         return NULL;
1401                 }
1402                 ds -= len;
1403         }
1404
1405         if (dgs) {
1406                 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1407                 if (memcmp(dig_in, dig_vv, dgs)) {
1408                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1409                                 (unsigned long long)sector, data_size);
1410                         drbd_free_ee(mdev, peer_req);
1411                         return NULL;
1412                 }
1413         }
1414         mdev->recv_cnt += data_size>>9;
1415         return peer_req;
1416 }
1417
1418 /* drbd_drain_block() just takes a data block
1419  * out of the socket input buffer, and discards it.
1420  */
1421 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1422 {
1423         struct page *page;
1424         int err = 0;
1425         void *data;
1426
1427         if (!data_size)
1428                 return 0;
1429
1430         page = drbd_pp_alloc(mdev, 1, 1);
1431
1432         data = kmap(page);
1433         while (data_size) {
1434                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1435
1436                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1437                 if (err)
1438                         break;
1439                 data_size -= len;
1440         }
1441         kunmap(page);
1442         drbd_pp_free(mdev, page, 0);
1443         return err;
1444 }
1445
1446 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1447                            sector_t sector, int data_size)
1448 {
1449         struct bio_vec *bvec;
1450         struct bio *bio;
1451         int dgs, err, i, expect;
1452         void *dig_in = mdev->tconn->int_dig_in;
1453         void *dig_vv = mdev->tconn->int_dig_vv;
1454
1455         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1456                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1457
1458         if (dgs) {
1459                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1460                 if (err)
1461                         return err;
1462         }
1463
1464         data_size -= dgs;
1465
1466         /* optimistically update recv_cnt.  if receiving fails below,
1467          * we disconnect anyways, and counters will be reset. */
1468         mdev->recv_cnt += data_size>>9;
1469
1470         bio = req->master_bio;
1471         D_ASSERT(sector == bio->bi_sector);
1472
1473         bio_for_each_segment(bvec, bio, i) {
1474                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1475                 expect = min_t(int, data_size, bvec->bv_len);
1476                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1477                 kunmap(bvec->bv_page);
1478                 if (err)
1479                         return err;
1480                 data_size -= expect;
1481         }
1482
1483         if (dgs) {
1484                 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1485                 if (memcmp(dig_in, dig_vv, dgs)) {
1486                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1487                         return -EINVAL;
1488                 }
1489         }
1490
1491         D_ASSERT(data_size == 0);
1492         return 0;
1493 }
1494
1495 /* e_end_resync_block() is called via
1496  * drbd_process_done_ee() by asender only */
1497 static int e_end_resync_block(struct drbd_work *w, int unused)
1498 {
1499         struct drbd_peer_request *peer_req =
1500                 container_of(w, struct drbd_peer_request, w);
1501         struct drbd_conf *mdev = w->mdev;
1502         sector_t sector = peer_req->i.sector;
1503         int err;
1504
1505         D_ASSERT(drbd_interval_empty(&peer_req->i));
1506
1507         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1508                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1509                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1510         } else {
1511                 /* Record failure to sync */
1512                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1513
1514                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1515         }
1516         dec_unacked(mdev);
1517
1518         return err;
1519 }
1520
1521 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1522 {
1523         struct drbd_peer_request *peer_req;
1524
1525         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1526         if (!peer_req)
1527                 goto fail;
1528
1529         dec_rs_pending(mdev);
1530
1531         inc_unacked(mdev);
1532         /* corresponding dec_unacked() in e_end_resync_block()
1533          * respective _drbd_clear_done_ee */
1534
1535         peer_req->w.cb = e_end_resync_block;
1536
1537         spin_lock_irq(&mdev->tconn->req_lock);
1538         list_add(&peer_req->w.list, &mdev->sync_ee);
1539         spin_unlock_irq(&mdev->tconn->req_lock);
1540
1541         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1542         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1543                 return 0;
1544
1545         /* don't care for the reason here */
1546         dev_err(DEV, "submit failed, triggering re-connect\n");
1547         spin_lock_irq(&mdev->tconn->req_lock);
1548         list_del(&peer_req->w.list);
1549         spin_unlock_irq(&mdev->tconn->req_lock);
1550
1551         drbd_free_ee(mdev, peer_req);
1552 fail:
1553         put_ldev(mdev);
1554         return -EIO;
1555 }
1556
1557 static struct drbd_request *
1558 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1559              sector_t sector, bool missing_ok, const char *func)
1560 {
1561         struct drbd_request *req;
1562
1563         /* Request object according to our peer */
1564         req = (struct drbd_request *)(unsigned long)id;
1565         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1566                 return req;
1567         if (!missing_ok) {
1568                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1569                         (unsigned long)id, (unsigned long long)sector);
1570         }
1571         return NULL;
1572 }
1573
1574 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1575 {
1576         struct drbd_conf *mdev;
1577         struct drbd_request *req;
1578         sector_t sector;
1579         int err;
1580         struct p_data *p = pi->data;
1581
1582         mdev = vnr_to_mdev(tconn, pi->vnr);
1583         if (!mdev)
1584                 return -EIO;
1585
1586         sector = be64_to_cpu(p->sector);
1587
1588         spin_lock_irq(&mdev->tconn->req_lock);
1589         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1590         spin_unlock_irq(&mdev->tconn->req_lock);
1591         if (unlikely(!req))
1592                 return -EIO;
1593
1594         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1595          * special casing it there for the various failure cases.
1596          * still no race with drbd_fail_pending_reads */
1597         err = recv_dless_read(mdev, req, sector, pi->size);
1598         if (!err)
1599                 req_mod(req, DATA_RECEIVED);
1600         /* else: nothing. handled from drbd_disconnect...
1601          * I don't think we may complete this just yet
1602          * in case we are "on-disconnect: freeze" */
1603
1604         return err;
1605 }
1606
1607 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1608 {
1609         struct drbd_conf *mdev;
1610         sector_t sector;
1611         int err;
1612         struct p_data *p = pi->data;
1613
1614         mdev = vnr_to_mdev(tconn, pi->vnr);
1615         if (!mdev)
1616                 return -EIO;
1617
1618         sector = be64_to_cpu(p->sector);
1619         D_ASSERT(p->block_id == ID_SYNCER);
1620
1621         if (get_ldev(mdev)) {
1622                 /* data is submitted to disk within recv_resync_read.
1623                  * corresponding put_ldev done below on error,
1624                  * or in drbd_peer_request_endio. */
1625                 err = recv_resync_read(mdev, sector, pi->size);
1626         } else {
1627                 if (__ratelimit(&drbd_ratelimit_state))
1628                         dev_err(DEV, "Can not write resync data to local disk.\n");
1629
1630                 err = drbd_drain_block(mdev, pi->size);
1631
1632                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1633         }
1634
1635         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1636
1637         return err;
1638 }
1639
1640 static int w_restart_write(struct drbd_work *w, int cancel)
1641 {
1642         struct drbd_request *req = container_of(w, struct drbd_request, w);
1643         struct drbd_conf *mdev = w->mdev;
1644         struct bio *bio;
1645         unsigned long start_time;
1646         unsigned long flags;
1647
1648         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1649         if (!expect(req->rq_state & RQ_POSTPONED)) {
1650                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1651                 return -EIO;
1652         }
1653         bio = req->master_bio;
1654         start_time = req->start_time;
1655         /* Postponed requests will not have their master_bio completed!  */
1656         __req_mod(req, DISCARD_WRITE, NULL);
1657         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1658
1659         while (__drbd_make_request(mdev, bio, start_time))
1660                 /* retry */ ;
1661         return 0;
1662 }
1663
1664 static void restart_conflicting_writes(struct drbd_conf *mdev,
1665                                        sector_t sector, int size)
1666 {
1667         struct drbd_interval *i;
1668         struct drbd_request *req;
1669
1670         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1671                 if (!i->local)
1672                         continue;
1673                 req = container_of(i, struct drbd_request, i);
1674                 if (req->rq_state & RQ_LOCAL_PENDING ||
1675                     !(req->rq_state & RQ_POSTPONED))
1676                         continue;
1677                 if (expect(list_empty(&req->w.list))) {
1678                         req->w.mdev = mdev;
1679                         req->w.cb = w_restart_write;
1680                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1681                 }
1682         }
1683 }
1684
1685 /* e_end_block() is called via drbd_process_done_ee().
1686  * this means this function only runs in the asender thread
1687  */
1688 static int e_end_block(struct drbd_work *w, int cancel)
1689 {
1690         struct drbd_peer_request *peer_req =
1691                 container_of(w, struct drbd_peer_request, w);
1692         struct drbd_conf *mdev = w->mdev;
1693         sector_t sector = peer_req->i.sector;
1694         int err = 0, pcmd;
1695
1696         if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
1697                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1698                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1699                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1700                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1701                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1702                         err = drbd_send_ack(mdev, pcmd, peer_req);
1703                         if (pcmd == P_RS_WRITE_ACK)
1704                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1705                 } else {
1706                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1707                         /* we expect it to be marked out of sync anyways...
1708                          * maybe assert this?  */
1709                 }
1710                 dec_unacked(mdev);
1711         }
1712         /* we delete from the conflict detection hash _after_ we sent out the
1713          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1714         if (mdev->tconn->net_conf->two_primaries) {
1715                 spin_lock_irq(&mdev->tconn->req_lock);
1716                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1717                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1718                 if (peer_req->flags & EE_RESTART_REQUESTS)
1719                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1720                 spin_unlock_irq(&mdev->tconn->req_lock);
1721         } else
1722                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1723
1724         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1725
1726         return err;
1727 }
1728
1729 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1730 {
1731         struct drbd_conf *mdev = w->mdev;
1732         struct drbd_peer_request *peer_req =
1733                 container_of(w, struct drbd_peer_request, w);
1734         int err;
1735
1736         err = drbd_send_ack(mdev, ack, peer_req);
1737         dec_unacked(mdev);
1738
1739         return err;
1740 }
1741
1742 static int e_send_discard_write(struct drbd_work *w, int unused)
1743 {
1744         return e_send_ack(w, P_DISCARD_WRITE);
1745 }
1746
1747 static int e_send_retry_write(struct drbd_work *w, int unused)
1748 {
1749         struct drbd_tconn *tconn = w->mdev->tconn;
1750
1751         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1752                              P_RETRY_WRITE : P_DISCARD_WRITE);
1753 }
1754
1755 static bool seq_greater(u32 a, u32 b)
1756 {
1757         /*
1758          * We assume 32-bit wrap-around here.
1759          * For 24-bit wrap-around, we would have to shift:
1760          *  a <<= 8; b <<= 8;
1761          */
1762         return (s32)a - (s32)b > 0;
1763 }
1764
1765 static u32 seq_max(u32 a, u32 b)
1766 {
1767         return seq_greater(a, b) ? a : b;
1768 }
1769
1770 static bool need_peer_seq(struct drbd_conf *mdev)
1771 {
1772         struct drbd_tconn *tconn = mdev->tconn;
1773
1774         /*
1775          * We only need to keep track of the last packet_seq number of our peer
1776          * if we are in dual-primary mode and we have the discard flag set; see
1777          * handle_write_conflicts().
1778          */
1779         return tconn->net_conf->two_primaries &&
1780                test_bit(DISCARD_CONCURRENT, &tconn->flags);
1781 }
1782
1783 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1784 {
1785         unsigned int newest_peer_seq;
1786
1787         if (need_peer_seq(mdev)) {
1788                 spin_lock(&mdev->peer_seq_lock);
1789                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1790                 mdev->peer_seq = newest_peer_seq;
1791                 spin_unlock(&mdev->peer_seq_lock);
1792                 /* wake up only if we actually changed mdev->peer_seq */
1793                 if (peer_seq == newest_peer_seq)
1794                         wake_up(&mdev->seq_wait);
1795         }
1796 }
1797
1798 /* Called from receive_Data.
1799  * Synchronize packets on sock with packets on msock.
1800  *
1801  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1802  * packet traveling on msock, they are still processed in the order they have
1803  * been sent.
1804  *
1805  * Note: we don't care for Ack packets overtaking P_DATA packets.
1806  *
1807  * In case packet_seq is larger than mdev->peer_seq number, there are
1808  * outstanding packets on the msock. We wait for them to arrive.
1809  * In case we are the logically next packet, we update mdev->peer_seq
1810  * ourselves. Correctly handles 32bit wrap around.
1811  *
1812  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1813  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1814  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1815  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1816  *
1817  * returns 0 if we may process the packet,
1818  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1819 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1820 {
1821         DEFINE_WAIT(wait);
1822         long timeout;
1823         int ret;
1824
1825         if (!need_peer_seq(mdev))
1826                 return 0;
1827
1828         spin_lock(&mdev->peer_seq_lock);
1829         for (;;) {
1830                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1831                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1832                         ret = 0;
1833                         break;
1834                 }
1835                 if (signal_pending(current)) {
1836                         ret = -ERESTARTSYS;
1837                         break;
1838                 }
1839                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1840                 spin_unlock(&mdev->peer_seq_lock);
1841                 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1842                 timeout = schedule_timeout(timeout);
1843                 spin_lock(&mdev->peer_seq_lock);
1844                 if (!timeout) {
1845                         ret = -ETIMEDOUT;
1846                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1847                         break;
1848                 }
1849         }
1850         spin_unlock(&mdev->peer_seq_lock);
1851         finish_wait(&mdev->seq_wait, &wait);
1852         return ret;
1853 }
1854
1855 /* see also bio_flags_to_wire()
1856  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1857  * flags and back. We may replicate to other kernel versions. */
1858 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1859 {
1860         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1861                 (dpf & DP_FUA ? REQ_FUA : 0) |
1862                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1863                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1864 }
1865
1866 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1867                                     unsigned int size)
1868 {
1869         struct drbd_interval *i;
1870
1871     repeat:
1872         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1873                 struct drbd_request *req;
1874                 struct bio_and_error m;
1875
1876                 if (!i->local)
1877                         continue;
1878                 req = container_of(i, struct drbd_request, i);
1879                 if (!(req->rq_state & RQ_POSTPONED))
1880                         continue;
1881                 req->rq_state &= ~RQ_POSTPONED;
1882                 __req_mod(req, NEG_ACKED, &m);
1883                 spin_unlock_irq(&mdev->tconn->req_lock);
1884                 if (m.bio)
1885                         complete_master_bio(mdev, &m);
1886                 spin_lock_irq(&mdev->tconn->req_lock);
1887                 goto repeat;
1888         }
1889 }
1890
1891 static int handle_write_conflicts(struct drbd_conf *mdev,
1892                                   struct drbd_peer_request *peer_req)
1893 {
1894         struct drbd_tconn *tconn = mdev->tconn;
1895         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1896         sector_t sector = peer_req->i.sector;
1897         const unsigned int size = peer_req->i.size;
1898         struct drbd_interval *i;
1899         bool equal;
1900         int err;
1901
1902         /*
1903          * Inserting the peer request into the write_requests tree will prevent
1904          * new conflicting local requests from being added.
1905          */
1906         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1907
1908     repeat:
1909         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1910                 if (i == &peer_req->i)
1911                         continue;
1912
1913                 if (!i->local) {
1914                         /*
1915                          * Our peer has sent a conflicting remote request; this
1916                          * should not happen in a two-node setup.  Wait for the
1917                          * earlier peer request to complete.
1918                          */
1919                         err = drbd_wait_misc(mdev, i);
1920                         if (err)
1921                                 goto out;
1922                         goto repeat;
1923                 }
1924
1925                 equal = i->sector == sector && i->size == size;
1926                 if (resolve_conflicts) {
1927                         /*
1928                          * If the peer request is fully contained within the
1929                          * overlapping request, it can be discarded; otherwise,
1930                          * it will be retried once all overlapping requests
1931                          * have completed.
1932                          */
1933                         bool discard = i->sector <= sector && i->sector +
1934                                        (i->size >> 9) >= sector + (size >> 9);
1935
1936                         if (!equal)
1937                                 dev_alert(DEV, "Concurrent writes detected: "
1938                                                "local=%llus +%u, remote=%llus +%u, "
1939                                                "assuming %s came first\n",
1940                                           (unsigned long long)i->sector, i->size,
1941                                           (unsigned long long)sector, size,
1942                                           discard ? "local" : "remote");
1943
1944                         inc_unacked(mdev);
1945                         peer_req->w.cb = discard ? e_send_discard_write :
1946                                                    e_send_retry_write;
1947                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
1948                         wake_asender(mdev->tconn);
1949
1950                         err = -ENOENT;
1951                         goto out;
1952                 } else {
1953                         struct drbd_request *req =
1954                                 container_of(i, struct drbd_request, i);
1955
1956                         if (!equal)
1957                                 dev_alert(DEV, "Concurrent writes detected: "
1958                                                "local=%llus +%u, remote=%llus +%u\n",
1959                                           (unsigned long long)i->sector, i->size,
1960                                           (unsigned long long)sector, size);
1961
1962                         if (req->rq_state & RQ_LOCAL_PENDING ||
1963                             !(req->rq_state & RQ_POSTPONED)) {
1964                                 /*
1965                                  * Wait for the node with the discard flag to
1966                                  * decide if this request will be discarded or
1967                                  * retried.  Requests that are discarded will
1968                                  * disappear from the write_requests tree.
1969                                  *
1970                                  * In addition, wait for the conflicting
1971                                  * request to finish locally before submitting
1972                                  * the conflicting peer request.
1973                                  */
1974                                 err = drbd_wait_misc(mdev, &req->i);
1975                                 if (err) {
1976                                         _conn_request_state(mdev->tconn,
1977                                                             NS(conn, C_TIMEOUT),
1978                                                             CS_HARD);
1979                                         fail_postponed_requests(mdev, sector, size);
1980                                         goto out;
1981                                 }
1982                                 goto repeat;
1983                         }
1984                         /*
1985                          * Remember to restart the conflicting requests after
1986                          * the new peer request has completed.
1987                          */
1988                         peer_req->flags |= EE_RESTART_REQUESTS;
1989                 }
1990         }
1991         err = 0;
1992
1993     out:
1994         if (err)
1995                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1996         return err;
1997 }
1998
1999 /* mirrored write */
2000 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2001 {
2002         struct drbd_conf *mdev;
2003         sector_t sector;
2004         struct drbd_peer_request *peer_req;
2005         struct p_data *p = pi->data;
2006         u32 peer_seq = be32_to_cpu(p->seq_num);
2007         int rw = WRITE;
2008         u32 dp_flags;
2009         int err;
2010
2011         mdev = vnr_to_mdev(tconn, pi->vnr);
2012         if (!mdev)
2013                 return -EIO;
2014
2015         if (!get_ldev(mdev)) {
2016                 int err2;
2017
2018                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2019                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2020                 atomic_inc(&mdev->current_epoch->epoch_size);
2021                 err2 = drbd_drain_block(mdev, pi->size);
2022                 if (!err)
2023                         err = err2;
2024                 return err;
2025         }
2026
2027         /*
2028          * Corresponding put_ldev done either below (on various errors), or in
2029          * drbd_peer_request_endio, if we successfully submit the data at the
2030          * end of this function.
2031          */
2032
2033         sector = be64_to_cpu(p->sector);
2034         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2035         if (!peer_req) {
2036                 put_ldev(mdev);
2037                 return -EIO;
2038         }
2039
2040         peer_req->w.cb = e_end_block;
2041
2042         dp_flags = be32_to_cpu(p->dp_flags);
2043         rw |= wire_flags_to_bio(mdev, dp_flags);
2044
2045         if (dp_flags & DP_MAY_SET_IN_SYNC)
2046                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2047
2048         spin_lock(&mdev->epoch_lock);
2049         peer_req->epoch = mdev->current_epoch;
2050         atomic_inc(&peer_req->epoch->epoch_size);
2051         atomic_inc(&peer_req->epoch->active);
2052         spin_unlock(&mdev->epoch_lock);
2053
2054         if (mdev->tconn->net_conf->two_primaries) {
2055                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2056                 if (err)
2057                         goto out_interrupted;
2058                 spin_lock_irq(&mdev->tconn->req_lock);
2059                 err = handle_write_conflicts(mdev, peer_req);
2060                 if (err) {
2061                         spin_unlock_irq(&mdev->tconn->req_lock);
2062                         if (err == -ENOENT) {
2063                                 put_ldev(mdev);
2064                                 return 0;
2065                         }
2066                         goto out_interrupted;
2067                 }
2068         } else
2069                 spin_lock_irq(&mdev->tconn->req_lock);
2070         list_add(&peer_req->w.list, &mdev->active_ee);
2071         spin_unlock_irq(&mdev->tconn->req_lock);
2072
2073         switch (mdev->tconn->net_conf->wire_protocol) {
2074         case DRBD_PROT_C:
2075                 inc_unacked(mdev);
2076                 /* corresponding dec_unacked() in e_end_block()
2077                  * respective _drbd_clear_done_ee */
2078                 break;
2079         case DRBD_PROT_B:
2080                 /* I really don't like it that the receiver thread
2081                  * sends on the msock, but anyways */
2082                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2083                 break;
2084         case DRBD_PROT_A:
2085                 /* nothing to do */
2086                 break;
2087         }
2088
2089         if (mdev->state.pdsk < D_INCONSISTENT) {
2090                 /* In case we have the only disk of the cluster, */
2091                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2092                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2093                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2094                 drbd_al_begin_io(mdev, &peer_req->i);
2095         }
2096
2097         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2098         if (!err)
2099                 return 0;
2100
2101         /* don't care for the reason here */
2102         dev_err(DEV, "submit failed, triggering re-connect\n");
2103         spin_lock_irq(&mdev->tconn->req_lock);
2104         list_del(&peer_req->w.list);
2105         drbd_remove_epoch_entry_interval(mdev, peer_req);
2106         spin_unlock_irq(&mdev->tconn->req_lock);
2107         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2108                 drbd_al_complete_io(mdev, &peer_req->i);
2109
2110 out_interrupted:
2111         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2112         put_ldev(mdev);
2113         drbd_free_ee(mdev, peer_req);
2114         return err;
2115 }
2116
2117 /* We may throttle resync, if the lower device seems to be busy,
2118  * and current sync rate is above c_min_rate.
2119  *
2120  * To decide whether or not the lower device is busy, we use a scheme similar
2121  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2122  * (more than 64 sectors) of activity we cannot account for with our own resync
2123  * activity, it obviously is "busy".
2124  *
2125  * The current sync rate used here uses only the most recent two step marks,
2126  * to have a short time average so we can react faster.
2127  */
2128 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2129 {
2130         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2131         unsigned long db, dt, dbdt;
2132         struct lc_element *tmp;
2133         int curr_events;
2134         int throttle = 0;
2135
2136         /* feature disabled? */
2137         if (mdev->ldev->dc.c_min_rate == 0)
2138                 return 0;
2139
2140         spin_lock_irq(&mdev->al_lock);
2141         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2142         if (tmp) {
2143                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2144                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2145                         spin_unlock_irq(&mdev->al_lock);
2146                         return 0;
2147                 }
2148                 /* Do not slow down if app IO is already waiting for this extent */
2149         }
2150         spin_unlock_irq(&mdev->al_lock);
2151
2152         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2153                       (int)part_stat_read(&disk->part0, sectors[1]) -
2154                         atomic_read(&mdev->rs_sect_ev);
2155
2156         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2157                 unsigned long rs_left;
2158                 int i;
2159
2160                 mdev->rs_last_events = curr_events;
2161
2162                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2163                  * approx. */
2164                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2165
2166                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2167                         rs_left = mdev->ov_left;
2168                 else
2169                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2170
2171                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2172                 if (!dt)
2173                         dt++;
2174                 db = mdev->rs_mark_left[i] - rs_left;
2175                 dbdt = Bit2KB(db/dt);
2176
2177                 if (dbdt > mdev->ldev->dc.c_min_rate)
2178                         throttle = 1;
2179         }
2180         return throttle;
2181 }
2182
2183
2184 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2185 {
2186         struct drbd_conf *mdev;
2187         sector_t sector;
2188         sector_t capacity;
2189         struct drbd_peer_request *peer_req;
2190         struct digest_info *di = NULL;
2191         int size, verb;
2192         unsigned int fault_type;
2193         struct p_block_req *p = pi->data;
2194
2195         mdev = vnr_to_mdev(tconn, pi->vnr);
2196         if (!mdev)
2197                 return -EIO;
2198         capacity = drbd_get_capacity(mdev->this_bdev);
2199
2200         sector = be64_to_cpu(p->sector);
2201         size   = be32_to_cpu(p->blksize);
2202
2203         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2204                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2205                                 (unsigned long long)sector, size);
2206                 return -EINVAL;
2207         }
2208         if (sector + (size>>9) > capacity) {
2209                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2210                                 (unsigned long long)sector, size);
2211                 return -EINVAL;
2212         }
2213
2214         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2215                 verb = 1;
2216                 switch (pi->cmd) {
2217                 case P_DATA_REQUEST:
2218                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2219                         break;
2220                 case P_RS_DATA_REQUEST:
2221                 case P_CSUM_RS_REQUEST:
2222                 case P_OV_REQUEST:
2223                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2224                         break;
2225                 case P_OV_REPLY:
2226                         verb = 0;
2227                         dec_rs_pending(mdev);
2228                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2229                         break;
2230                 default:
2231                         BUG();
2232                 }
2233                 if (verb && __ratelimit(&drbd_ratelimit_state))
2234                         dev_err(DEV, "Can not satisfy peer's read request, "
2235                             "no local data.\n");
2236
2237                 /* drain possibly payload */
2238                 return drbd_drain_block(mdev, pi->size);
2239         }
2240
2241         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2242          * "criss-cross" setup, that might cause write-out on some other DRBD,
2243          * which in turn might block on the other node at this very place.  */
2244         peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2245         if (!peer_req) {
2246                 put_ldev(mdev);
2247                 return -ENOMEM;
2248         }
2249
2250         switch (pi->cmd) {
2251         case P_DATA_REQUEST:
2252                 peer_req->w.cb = w_e_end_data_req;
2253                 fault_type = DRBD_FAULT_DT_RD;
2254                 /* application IO, don't drbd_rs_begin_io */
2255                 goto submit;
2256
2257         case P_RS_DATA_REQUEST:
2258                 peer_req->w.cb = w_e_end_rsdata_req;
2259                 fault_type = DRBD_FAULT_RS_RD;
2260                 /* used in the sector offset progress display */
2261                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2262                 break;
2263
2264         case P_OV_REPLY:
2265         case P_CSUM_RS_REQUEST:
2266                 fault_type = DRBD_FAULT_RS_RD;
2267                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2268                 if (!di)
2269                         goto out_free_e;
2270
2271                 di->digest_size = pi->size;
2272                 di->digest = (((char *)di)+sizeof(struct digest_info));
2273
2274                 peer_req->digest = di;
2275                 peer_req->flags |= EE_HAS_DIGEST;
2276
2277                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2278                         goto out_free_e;
2279
2280                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2281                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2282                         peer_req->w.cb = w_e_end_csum_rs_req;
2283                         /* used in the sector offset progress display */
2284                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2285                 } else if (pi->cmd == P_OV_REPLY) {
2286                         /* track progress, we may need to throttle */
2287                         atomic_add(size >> 9, &mdev->rs_sect_in);
2288                         peer_req->w.cb = w_e_end_ov_reply;
2289                         dec_rs_pending(mdev);
2290                         /* drbd_rs_begin_io done when we sent this request,
2291                          * but accounting still needs to be done. */
2292                         goto submit_for_resync;
2293                 }
2294                 break;
2295
2296         case P_OV_REQUEST:
2297                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2298                     mdev->tconn->agreed_pro_version >= 90) {
2299                         unsigned long now = jiffies;
2300                         int i;
2301                         mdev->ov_start_sector = sector;
2302                         mdev->ov_position = sector;
2303                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2304                         mdev->rs_total = mdev->ov_left;
2305                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2306                                 mdev->rs_mark_left[i] = mdev->ov_left;
2307                                 mdev->rs_mark_time[i] = now;
2308                         }
2309                         dev_info(DEV, "Online Verify start sector: %llu\n",
2310                                         (unsigned long long)sector);
2311                 }
2312                 peer_req->w.cb = w_e_end_ov_req;
2313                 fault_type = DRBD_FAULT_RS_RD;
2314                 break;
2315
2316         default:
2317                 BUG();
2318         }
2319
2320         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2321          * wrt the receiver, but it is not as straightforward as it may seem.
2322          * Various places in the resync start and stop logic assume resync
2323          * requests are processed in order, requeuing this on the worker thread
2324          * introduces a bunch of new code for synchronization between threads.
2325          *
2326          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2327          * "forever", throttling after drbd_rs_begin_io will lock that extent
2328          * for application writes for the same time.  For now, just throttle
2329          * here, where the rest of the code expects the receiver to sleep for
2330          * a while, anyways.
2331          */
2332
2333         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2334          * this defers syncer requests for some time, before letting at least
2335          * on request through.  The resync controller on the receiving side
2336          * will adapt to the incoming rate accordingly.
2337          *
2338          * We cannot throttle here if remote is Primary/SyncTarget:
2339          * we would also throttle its application reads.
2340          * In that case, throttling is done on the SyncTarget only.
2341          */
2342         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2343                 schedule_timeout_uninterruptible(HZ/10);
2344         if (drbd_rs_begin_io(mdev, sector))
2345                 goto out_free_e;
2346
2347 submit_for_resync:
2348         atomic_add(size >> 9, &mdev->rs_sect_ev);
2349
2350 submit:
2351         inc_unacked(mdev);
2352         spin_lock_irq(&mdev->tconn->req_lock);
2353         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2354         spin_unlock_irq(&mdev->tconn->req_lock);
2355
2356         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2357                 return 0;
2358
2359         /* don't care for the reason here */
2360         dev_err(DEV, "submit failed, triggering re-connect\n");
2361         spin_lock_irq(&mdev->tconn->req_lock);
2362         list_del(&peer_req->w.list);
2363         spin_unlock_irq(&mdev->tconn->req_lock);
2364         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2365
2366 out_free_e:
2367         put_ldev(mdev);
2368         drbd_free_ee(mdev, peer_req);
2369         return -EIO;
2370 }
2371
2372 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2373 {
2374         int self, peer, rv = -100;
2375         unsigned long ch_self, ch_peer;
2376
2377         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2378         peer = mdev->p_uuid[UI_BITMAP] & 1;
2379
2380         ch_peer = mdev->p_uuid[UI_SIZE];
2381         ch_self = mdev->comm_bm_set;
2382
2383         switch (mdev->tconn->net_conf->after_sb_0p) {
2384         case ASB_CONSENSUS:
2385         case ASB_DISCARD_SECONDARY:
2386         case ASB_CALL_HELPER:
2387                 dev_err(DEV, "Configuration error.\n");
2388                 break;
2389         case ASB_DISCONNECT:
2390                 break;
2391         case ASB_DISCARD_YOUNGER_PRI:
2392                 if (self == 0 && peer == 1) {
2393                         rv = -1;
2394                         break;
2395                 }
2396                 if (self == 1 && peer == 0) {
2397                         rv =  1;
2398                         break;
2399                 }
2400                 /* Else fall through to one of the other strategies... */
2401         case ASB_DISCARD_OLDER_PRI:
2402                 if (self == 0 && peer == 1) {
2403                         rv = 1;
2404                         break;
2405                 }
2406                 if (self == 1 && peer == 0) {
2407                         rv = -1;
2408                         break;
2409                 }
2410                 /* Else fall through to one of the other strategies... */
2411                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2412                      "Using discard-least-changes instead\n");
2413         case ASB_DISCARD_ZERO_CHG:
2414                 if (ch_peer == 0 && ch_self == 0) {
2415                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2416                                 ? -1 : 1;
2417                         break;
2418                 } else {
2419                         if (ch_peer == 0) { rv =  1; break; }
2420                         if (ch_self == 0) { rv = -1; break; }
2421                 }
2422                 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2423                         break;
2424         case ASB_DISCARD_LEAST_CHG:
2425                 if      (ch_self < ch_peer)
2426                         rv = -1;
2427                 else if (ch_self > ch_peer)
2428                         rv =  1;
2429                 else /* ( ch_self == ch_peer ) */
2430                      /* Well, then use something else. */
2431                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2432                                 ? -1 : 1;
2433                 break;
2434         case ASB_DISCARD_LOCAL:
2435                 rv = -1;
2436                 break;
2437         case ASB_DISCARD_REMOTE:
2438                 rv =  1;
2439         }
2440
2441         return rv;
2442 }
2443
2444 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2445 {
2446         int hg, rv = -100;
2447
2448         switch (mdev->tconn->net_conf->after_sb_1p) {
2449         case ASB_DISCARD_YOUNGER_PRI:
2450         case ASB_DISCARD_OLDER_PRI:
2451         case ASB_DISCARD_LEAST_CHG:
2452         case ASB_DISCARD_LOCAL:
2453         case ASB_DISCARD_REMOTE:
2454                 dev_err(DEV, "Configuration error.\n");
2455                 break;
2456         case ASB_DISCONNECT:
2457                 break;
2458         case ASB_CONSENSUS:
2459                 hg = drbd_asb_recover_0p(mdev);
2460                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2461                         rv = hg;
2462                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2463                         rv = hg;
2464                 break;
2465         case ASB_VIOLENTLY:
2466                 rv = drbd_asb_recover_0p(mdev);
2467                 break;
2468         case ASB_DISCARD_SECONDARY:
2469                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2470         case ASB_CALL_HELPER:
2471                 hg = drbd_asb_recover_0p(mdev);
2472                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2473                         enum drbd_state_rv rv2;
2474
2475                         drbd_set_role(mdev, R_SECONDARY, 0);
2476                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2477                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2478                           * we do not need to wait for the after state change work either. */
2479                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2480                         if (rv2 != SS_SUCCESS) {
2481                                 drbd_khelper(mdev, "pri-lost-after-sb");
2482                         } else {
2483                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2484                                 rv = hg;
2485                         }
2486                 } else
2487                         rv = hg;
2488         }
2489
2490         return rv;
2491 }
2492
2493 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2494 {
2495         int hg, rv = -100;
2496
2497         switch (mdev->tconn->net_conf->after_sb_2p) {
2498         case ASB_DISCARD_YOUNGER_PRI:
2499         case ASB_DISCARD_OLDER_PRI:
2500         case ASB_DISCARD_LEAST_CHG:
2501         case ASB_DISCARD_LOCAL:
2502         case ASB_DISCARD_REMOTE:
2503         case ASB_CONSENSUS:
2504         case ASB_DISCARD_SECONDARY:
2505                 dev_err(DEV, "Configuration error.\n");
2506                 break;
2507         case ASB_VIOLENTLY:
2508                 rv = drbd_asb_recover_0p(mdev);
2509                 break;
2510         case ASB_DISCONNECT:
2511                 break;
2512         case ASB_CALL_HELPER:
2513                 hg = drbd_asb_recover_0p(mdev);
2514                 if (hg == -1) {
2515                         enum drbd_state_rv rv2;
2516
2517                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2518                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2519                           * we do not need to wait for the after state change work either. */
2520                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2521                         if (rv2 != SS_SUCCESS) {
2522                                 drbd_khelper(mdev, "pri-lost-after-sb");
2523                         } else {
2524                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2525                                 rv = hg;
2526                         }
2527                 } else
2528                         rv = hg;
2529         }
2530
2531         return rv;
2532 }
2533
2534 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2535                            u64 bits, u64 flags)
2536 {
2537         if (!uuid) {
2538                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2539                 return;
2540         }
2541         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2542              text,
2543              (unsigned long long)uuid[UI_CURRENT],
2544              (unsigned long long)uuid[UI_BITMAP],
2545              (unsigned long long)uuid[UI_HISTORY_START],
2546              (unsigned long long)uuid[UI_HISTORY_END],
2547              (unsigned long long)bits,
2548              (unsigned long long)flags);
2549 }
2550
2551 /*
2552   100   after split brain try auto recover
2553     2   C_SYNC_SOURCE set BitMap
2554     1   C_SYNC_SOURCE use BitMap
2555     0   no Sync
2556    -1   C_SYNC_TARGET use BitMap
2557    -2   C_SYNC_TARGET set BitMap
2558  -100   after split brain, disconnect
2559 -1000   unrelated data
2560 -1091   requires proto 91
2561 -1096   requires proto 96
2562  */
2563 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2564 {
2565         u64 self, peer;
2566         int i, j;
2567
2568         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2569         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2570
2571         *rule_nr = 10;
2572         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2573                 return 0;
2574
2575         *rule_nr = 20;
2576         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2577              peer != UUID_JUST_CREATED)
2578                 return -2;
2579
2580         *rule_nr = 30;
2581         if (self != UUID_JUST_CREATED &&
2582             (peer == UUID_JUST_CREATED || peer == (u64)0))
2583                 return 2;
2584
2585         if (self == peer) {
2586                 int rct, dc; /* roles at crash time */
2587
2588                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2589
2590                         if (mdev->tconn->agreed_pro_version < 91)
2591                                 return -1091;
2592
2593                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2594                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2595                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2596                                 drbd_uuid_set_bm(mdev, 0UL);
2597
2598                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2599                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2600                                 *rule_nr = 34;
2601                         } else {
2602                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2603                                 *rule_nr = 36;
2604                         }
2605
2606                         return 1;
2607                 }
2608
2609                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2610
2611                         if (mdev->tconn->agreed_pro_version < 91)
2612                                 return -1091;
2613
2614                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2615                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2616                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2617
2618                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2619                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2620                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2621
2622                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2623                                 *rule_nr = 35;
2624                         } else {
2625                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2626                                 *rule_nr = 37;
2627                         }
2628
2629                         return -1;
2630                 }
2631
2632                 /* Common power [off|failure] */
2633                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2634                         (mdev->p_uuid[UI_FLAGS] & 2);
2635                 /* lowest bit is set when we were primary,
2636                  * next bit (weight 2) is set when peer was primary */
2637                 *rule_nr = 40;
2638
2639                 switch (rct) {
2640                 case 0: /* !self_pri && !peer_pri */ return 0;
2641                 case 1: /*  self_pri && !peer_pri */ return 1;
2642                 case 2: /* !self_pri &&  peer_pri */ return -1;
2643                 case 3: /*  self_pri &&  peer_pri */
2644                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2645                         return dc ? -1 : 1;
2646                 }
2647         }
2648
2649         *rule_nr = 50;
2650         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2651         if (self == peer)
2652                 return -1;
2653
2654         *rule_nr = 51;
2655         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2656         if (self == peer) {
2657                 if (mdev->tconn->agreed_pro_version < 96 ?
2658                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2659                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2660                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2661                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2662                            resync as sync source modifications of the peer's UUIDs. */
2663
2664                         if (mdev->tconn->agreed_pro_version < 91)
2665                                 return -1091;
2666
2667                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2668                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2669
2670                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2671                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2672
2673                         return -1;
2674                 }
2675         }
2676
2677         *rule_nr = 60;
2678         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2679         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2680                 peer = mdev->p_uuid[i] & ~((u64)1);
2681                 if (self == peer)
2682                         return -2;
2683         }
2684
2685         *rule_nr = 70;
2686         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2687         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2688         if (self == peer)
2689                 return 1;
2690
2691         *rule_nr = 71;
2692         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2693         if (self == peer) {
2694                 if (mdev->tconn->agreed_pro_version < 96 ?
2695                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2696                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2697                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2698                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2699                            resync as sync source modifications of our UUIDs. */
2700
2701                         if (mdev->tconn->agreed_pro_version < 91)
2702                                 return -1091;
2703
2704                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2705                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2706
2707                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2708                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2709                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2710
2711                         return 1;
2712                 }
2713         }
2714
2715
2716         *rule_nr = 80;
2717         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2718         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2719                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2720                 if (self == peer)
2721                         return 2;
2722         }
2723
2724         *rule_nr = 90;
2725         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2726         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2727         if (self == peer && self != ((u64)0))
2728                 return 100;
2729
2730         *rule_nr = 100;
2731         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2732                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2733                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2734                         peer = mdev->p_uuid[j] & ~((u64)1);
2735                         if (self == peer)
2736                                 return -100;
2737                 }
2738         }
2739
2740         return -1000;
2741 }
2742
2743 /* drbd_sync_handshake() returns the new conn state on success, or
2744    CONN_MASK (-1) on failure.
2745  */
2746 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2747                                            enum drbd_disk_state peer_disk) __must_hold(local)
2748 {
2749         int hg, rule_nr;
2750         enum drbd_conns rv = C_MASK;
2751         enum drbd_disk_state mydisk;
2752
2753         mydisk = mdev->state.disk;
2754         if (mydisk == D_NEGOTIATING)
2755                 mydisk = mdev->new_state_tmp.disk;
2756
2757         dev_info(DEV, "drbd_sync_handshake:\n");
2758         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2759         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2760                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2761
2762         hg = drbd_uuid_compare(mdev, &rule_nr);
2763
2764         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2765
2766         if (hg == -1000) {
2767                 dev_alert(DEV, "Unrelated data, aborting!\n");
2768                 return C_MASK;
2769         }
2770         if (hg < -1000) {
2771                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2772                 return C_MASK;
2773         }
2774
2775         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2776             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2777                 int f = (hg == -100) || abs(hg) == 2;
2778                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2779                 if (f)
2780                         hg = hg*2;
2781                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2782                      hg > 0 ? "source" : "target");
2783         }
2784
2785         if (abs(hg) == 100)
2786                 drbd_khelper(mdev, "initial-split-brain");
2787
2788         if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
2789                 int pcount = (mdev->state.role == R_PRIMARY)
2790                            + (peer_role == R_PRIMARY);
2791                 int forced = (hg == -100);
2792
2793                 switch (pcount) {
2794                 case 0:
2795                         hg = drbd_asb_recover_0p(mdev);
2796                         break;
2797                 case 1:
2798                         hg = drbd_asb_recover_1p(mdev);
2799                         break;
2800                 case 2:
2801                         hg = drbd_asb_recover_2p(mdev);
2802                         break;
2803                 }
2804                 if (abs(hg) < 100) {
2805                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2806                              "automatically solved. Sync from %s node\n",
2807                              pcount, (hg < 0) ? "peer" : "this");
2808                         if (forced) {
2809                                 dev_warn(DEV, "Doing a full sync, since"
2810                                      " UUIDs where ambiguous.\n");
2811                                 hg = hg*2;
2812                         }
2813                 }
2814         }
2815
2816         if (hg == -100) {
2817                 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2818                         hg = -1;
2819                 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2820                         hg = 1;
2821
2822                 if (abs(hg) < 100)
2823                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2824                              "Sync from %s node\n",
2825                              (hg < 0) ? "peer" : "this");
2826         }
2827
2828         if (hg == -100) {
2829                 /* FIXME this log message is not correct if we end up here
2830                  * after an attempted attach on a diskless node.
2831                  * We just refuse to attach -- well, we drop the "connection"
2832                  * to that disk, in a way... */
2833                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2834                 drbd_khelper(mdev, "split-brain");
2835                 return C_MASK;
2836         }
2837
2838         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2839                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2840                 return C_MASK;
2841         }
2842
2843         if (hg < 0 && /* by intention we do not use mydisk here. */
2844             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2845                 switch (mdev->tconn->net_conf->rr_conflict) {
2846                 case ASB_CALL_HELPER:
2847                         drbd_khelper(mdev, "pri-lost");
2848                         /* fall through */
2849                 case ASB_DISCONNECT:
2850                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2851                         return C_MASK;
2852                 case ASB_VIOLENTLY:
2853                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2854                              "assumption\n");
2855                 }
2856         }
2857
2858         if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2859                 if (hg == 0)
2860                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2861                 else
2862                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2863                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2864                                  abs(hg) >= 2 ? "full" : "bit-map based");
2865                 return C_MASK;
2866         }
2867
2868         if (abs(hg) >= 2) {
2869                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2870                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2871                                         BM_LOCKED_SET_ALLOWED))
2872                         return C_MASK;
2873         }
2874
2875         if (hg > 0) { /* become sync source. */
2876                 rv = C_WF_BITMAP_S;
2877         } else if (hg < 0) { /* become sync target */
2878                 rv = C_WF_BITMAP_T;
2879         } else {
2880                 rv = C_CONNECTED;
2881                 if (drbd_bm_total_weight(mdev)) {
2882                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2883                              drbd_bm_total_weight(mdev));
2884                 }
2885         }
2886
2887         return rv;
2888 }
2889
2890 /* returns 1 if invalid */
2891 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2892 {
2893         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2894         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2895             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2896                 return 0;
2897
2898         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2899         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2900             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2901                 return 1;
2902
2903         /* everything else is valid if they are equal on both sides. */
2904         if (peer == self)
2905                 return 0;
2906
2907         /* everything es is invalid. */
2908         return 1;
2909 }
2910
2911 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2912 {
2913         struct p_protocol *p = pi->data;
2914         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2915         int p_want_lose, p_two_primaries, cf;
2916         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2917
2918         p_proto         = be32_to_cpu(p->protocol);
2919         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2920         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2921         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2922         p_two_primaries = be32_to_cpu(p->two_primaries);
2923         cf              = be32_to_cpu(p->conn_flags);
2924         p_want_lose = cf & CF_WANT_LOSE;
2925
2926         clear_bit(CONN_DRY_RUN, &tconn->flags);
2927
2928         if (cf & CF_DRY_RUN)
2929                 set_bit(CONN_DRY_RUN, &tconn->flags);
2930
2931         if (p_proto != tconn->net_conf->wire_protocol) {
2932                 conn_err(tconn, "incompatible communication protocols\n");
2933                 goto disconnect;
2934         }
2935
2936         if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2937                 conn_err(tconn, "incompatible after-sb-0pri settings\n");
2938                 goto disconnect;
2939         }
2940
2941         if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2942                 conn_err(tconn, "incompatible after-sb-1pri settings\n");
2943                 goto disconnect;
2944         }
2945
2946         if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2947                 conn_err(tconn, "incompatible after-sb-2pri settings\n");
2948                 goto disconnect;
2949         }
2950
2951         if (p_want_lose && tconn->net_conf->want_lose) {
2952                 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
2953                 goto disconnect;
2954         }
2955
2956         if (p_two_primaries != tconn->net_conf->two_primaries) {
2957                 conn_err(tconn, "incompatible setting of the two-primaries options\n");
2958                 goto disconnect;
2959         }
2960
2961         if (tconn->agreed_pro_version >= 87) {
2962                 unsigned char *my_alg = tconn->net_conf->integrity_alg;
2963                 int err;
2964
2965                 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
2966                 if (err)
2967                         return err;
2968
2969                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2970                 if (strcmp(p_integrity_alg, my_alg)) {
2971                         conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
2972                         goto disconnect;
2973                 }
2974                 conn_info(tconn, "data-integrity-alg: %s\n",
2975                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2976         }
2977
2978         return 0;
2979
2980 disconnect:
2981         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
2982         return -EIO;
2983 }
2984
2985 /* helper function
2986  * input: alg name, feature name
2987  * return: NULL (alg name was "")
2988  *         ERR_PTR(error) if something goes wrong
2989  *         or the crypto hash ptr, if it worked out ok. */
2990 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2991                 const char *alg, const char *name)
2992 {
2993         struct crypto_hash *tfm;
2994
2995         if (!alg[0])
2996                 return NULL;
2997
2998         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2999         if (IS_ERR(tfm)) {
3000                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3001                         alg, name, PTR_ERR(tfm));
3002                 return tfm;
3003         }
3004         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
3005                 crypto_free_hash(tfm);
3006                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
3007                 return ERR_PTR(-EINVAL);
3008         }
3009         return tfm;
3010 }
3011
3012 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3013 {
3014         void *buffer = tconn->data.rbuf;
3015         int size = pi->size;
3016
3017         while (size) {
3018                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3019                 s = drbd_recv(tconn, buffer, s);
3020                 if (s <= 0) {
3021                         if (s < 0)
3022                                 return s;
3023                         break;
3024                 }
3025                 size -= s;
3026         }
3027         if (size)
3028                 return -EIO;
3029         return 0;
3030 }
3031
3032 /*
3033  * config_unknown_volume  -  device configuration command for unknown volume
3034  *
3035  * When a device is added to an existing connection, the node on which the
3036  * device is added first will send configuration commands to its peer but the
3037  * peer will not know about the device yet.  It will warn and ignore these
3038  * commands.  Once the device is added on the second node, the second node will
3039  * send the same device configuration commands, but in the other direction.
3040  *
3041  * (We can also end up here if drbd is misconfigured.)
3042  */
3043 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3044 {
3045         conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3046                   pi->vnr, cmdname(pi->cmd));
3047         return ignore_remaining_packet(tconn, pi);
3048 }
3049
3050 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3051 {
3052         struct drbd_conf *mdev;
3053         struct p_rs_param_95 *p;
3054         unsigned int header_size, data_size, exp_max_sz;
3055         struct crypto_hash *verify_tfm = NULL;
3056         struct crypto_hash *csums_tfm = NULL;
3057         const int apv = tconn->agreed_pro_version;
3058         int *rs_plan_s = NULL;
3059         int fifo_size = 0;
3060         int err;
3061
3062         mdev = vnr_to_mdev(tconn, pi->vnr);
3063         if (!mdev)
3064                 return config_unknown_volume(tconn, pi);
3065
3066         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3067                     : apv == 88 ? sizeof(struct p_rs_param)
3068                                         + SHARED_SECRET_MAX
3069                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3070                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3071
3072         if (pi->size > exp_max_sz) {
3073                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3074                     pi->size, exp_max_sz);
3075                 return -EIO;
3076         }
3077
3078         if (apv <= 88) {
3079                 header_size = sizeof(struct p_rs_param);
3080                 data_size = pi->size - header_size;
3081         } else if (apv <= 94) {
3082                 header_size = sizeof(struct p_rs_param_89);
3083                 data_size = pi->size - header_size;
3084                 D_ASSERT(data_size == 0);
3085         } else {
3086                 header_size = sizeof(struct p_rs_param_95);
3087                 data_size = pi->size - header_size;
3088                 D_ASSERT(data_size == 0);
3089         }
3090
3091         /* initialize verify_alg and csums_alg */
3092         p = pi->data;
3093         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3094
3095         err = drbd_recv_all(mdev->tconn, p, header_size);
3096         if (err)
3097                 return err;
3098
3099         if (get_ldev(mdev)) {
3100                 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3101                 put_ldev(mdev);
3102         }
3103
3104         if (apv >= 88) {
3105                 if (apv == 88) {
3106                         if (data_size > SHARED_SECRET_MAX) {
3107                                 dev_err(DEV, "verify-alg too long, "
3108                                     "peer wants %u, accepting only %u byte\n",
3109                                                 data_size, SHARED_SECRET_MAX);
3110                                 return -EIO;
3111                         }
3112
3113                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3114                         if (err)
3115                                 return err;
3116
3117                         /* we expect NUL terminated string */
3118                         /* but just in case someone tries to be evil */
3119                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3120                         p->verify_alg[data_size-1] = 0;
3121
3122                 } else /* apv >= 89 */ {
3123                         /* we still expect NUL terminated strings */
3124                         /* but just in case someone tries to be evil */
3125                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3126                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3127                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3128                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3129                 }
3130
3131                 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
3132                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3133                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3134                                     mdev->tconn->net_conf->verify_alg, p->verify_alg);
3135                                 goto disconnect;
3136                         }
3137                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3138                                         p->verify_alg, "verify-alg");
3139                         if (IS_ERR(verify_tfm)) {
3140                                 verify_tfm = NULL;
3141                                 goto disconnect;
3142                         }
3143                 }
3144
3145                 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
3146                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3147                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3148                                     mdev->tconn->net_conf->csums_alg, p->csums_alg);
3149                                 goto disconnect;
3150                         }
3151                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3152                                         p->csums_alg, "csums-alg");
3153                         if (IS_ERR(csums_tfm)) {
3154                                 csums_tfm = NULL;
3155                                 goto disconnect;
3156                         }
3157                 }
3158
3159                 if (apv > 94 && get_ldev(mdev)) {
3160                         mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3161                         mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3162                         mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3163                         mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3164                         mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3165
3166                         fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3167                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3168                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3169                                 if (!rs_plan_s) {
3170                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3171                                         put_ldev(mdev);
3172                                         goto disconnect;
3173                                 }
3174                         }
3175                         put_ldev(mdev);
3176                 }
3177
3178                 spin_lock(&mdev->peer_seq_lock);
3179                 /* lock against drbd_nl_syncer_conf() */
3180                 if (verify_tfm) {
3181                         strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3182                         mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3183                         crypto_free_hash(mdev->tconn->verify_tfm);
3184                         mdev->tconn->verify_tfm = verify_tfm;
3185                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3186                 }
3187                 if (csums_tfm) {
3188                         strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3189                         mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3190                         crypto_free_hash(mdev->tconn->csums_tfm);
3191                         mdev->tconn->csums_tfm = csums_tfm;
3192                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3193                 }
3194                 if (fifo_size != mdev->rs_plan_s.size) {
3195                         kfree(mdev->rs_plan_s.values);
3196                         mdev->rs_plan_s.values = rs_plan_s;
3197                         mdev->rs_plan_s.size   = fifo_size;
3198                         mdev->rs_planed = 0;
3199                 }
3200                 spin_unlock(&mdev->peer_seq_lock);
3201         }
3202         return 0;
3203
3204 disconnect:
3205         /* just for completeness: actually not needed,
3206          * as this is not reached if csums_tfm was ok. */
3207         crypto_free_hash(csums_tfm);
3208         /* but free the verify_tfm again, if csums_tfm did not work out */
3209         crypto_free_hash(verify_tfm);
3210         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3211         return -EIO;
3212 }
3213
3214 /* warn if the arguments differ by more than 12.5% */
3215 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3216         const char *s, sector_t a, sector_t b)
3217 {
3218         sector_t d;
3219         if (a == 0 || b == 0)
3220                 return;
3221         d = (a > b) ? (a - b) : (b - a);
3222         if (d > (a>>3) || d > (b>>3))
3223                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3224                      (unsigned long long)a, (unsigned long long)b);
3225 }
3226
3227 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3228 {
3229         struct drbd_conf *mdev;
3230         struct p_sizes *p = pi->data;
3231         enum determine_dev_size dd = unchanged;
3232         sector_t p_size, p_usize, my_usize;
3233         int ldsc = 0; /* local disk size changed */
3234         enum dds_flags ddsf;
3235
3236         mdev = vnr_to_mdev(tconn, pi->vnr);
3237         if (!mdev)
3238                 return config_unknown_volume(tconn, pi);
3239
3240         p_size = be64_to_cpu(p->d_size);
3241         p_usize = be64_to_cpu(p->u_size);
3242
3243         /* just store the peer's disk size for now.
3244          * we still need to figure out whether we accept that. */
3245         mdev->p_size = p_size;
3246
3247         if (get_ldev(mdev)) {
3248                 warn_if_differ_considerably(mdev, "lower level device sizes",
3249                            p_size, drbd_get_max_capacity(mdev->ldev));
3250                 warn_if_differ_considerably(mdev, "user requested size",
3251                                             p_usize, mdev->ldev->dc.disk_size);
3252
3253                 /* if this is the first connect, or an otherwise expected
3254                  * param exchange, choose the minimum */
3255                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3256                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3257                                              p_usize);
3258
3259                 my_usize = mdev->ldev->dc.disk_size;
3260
3261                 if (mdev->ldev->dc.disk_size != p_usize) {
3262                         mdev->ldev->dc.disk_size = p_usize;
3263                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3264                              (unsigned long)mdev->ldev->dc.disk_size);
3265                 }
3266
3267                 /* Never shrink a device with usable data during connect.
3268                    But allow online shrinking if we are connected. */
3269                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3270                    drbd_get_capacity(mdev->this_bdev) &&
3271                    mdev->state.disk >= D_OUTDATED &&
3272                    mdev->state.conn < C_CONNECTED) {
3273                         dev_err(DEV, "The peer's disk size is too small!\n");
3274                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3275                         mdev->ldev->dc.disk_size = my_usize;
3276                         put_ldev(mdev);
3277                         return -EIO;
3278                 }
3279                 put_ldev(mdev);
3280         }
3281
3282         ddsf = be16_to_cpu(p->dds_flags);
3283         if (get_ldev(mdev)) {
3284                 dd = drbd_determine_dev_size(mdev, ddsf);
3285                 put_ldev(mdev);
3286                 if (dd == dev_size_error)
3287                         return -EIO;
3288                 drbd_md_sync(mdev);
3289         } else {
3290                 /* I am diskless, need to accept the peer's size. */
3291                 drbd_set_my_capacity(mdev, p_size);
3292         }
3293
3294         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3295         drbd_reconsider_max_bio_size(mdev);
3296
3297         if (get_ldev(mdev)) {
3298                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3299                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3300                         ldsc = 1;
3301                 }
3302
3303                 put_ldev(mdev);
3304         }
3305
3306         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3307                 if (be64_to_cpu(p->c_size) !=
3308                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3309                         /* we have different sizes, probably peer
3310                          * needs to know my new size... */
3311                         drbd_send_sizes(mdev, 0, ddsf);
3312                 }
3313                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3314                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3315                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3316                             mdev->state.disk >= D_INCONSISTENT) {
3317                                 if (ddsf & DDSF_NO_RESYNC)
3318                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3319                                 else
3320                                         resync_after_online_grow(mdev);
3321                         } else
3322                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3323                 }
3324         }
3325
3326         return 0;
3327 }
3328
3329 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3330 {
3331         struct drbd_conf *mdev;
3332         struct p_uuids *p = pi->data;
3333         u64 *p_uuid;
3334         int i, updated_uuids = 0;
3335
3336         mdev = vnr_to_mdev(tconn, pi->vnr);
3337         if (!mdev)
3338                 return config_unknown_volume(tconn, pi);
3339
3340         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3341
3342         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3343                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3344
3345         kfree(mdev->p_uuid);
3346         mdev->p_uuid = p_uuid;
3347
3348         if (mdev->state.conn < C_CONNECTED &&
3349             mdev->state.disk < D_INCONSISTENT &&
3350             mdev->state.role == R_PRIMARY &&
3351             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3352                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3353                     (unsigned long long)mdev->ed_uuid);
3354                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3355                 return -EIO;
3356         }
3357
3358         if (get_ldev(mdev)) {
3359                 int skip_initial_sync =
3360                         mdev->state.conn == C_CONNECTED &&
3361                         mdev->tconn->agreed_pro_version >= 90 &&
3362                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3363                         (p_uuid[UI_FLAGS] & 8);
3364                 if (skip_initial_sync) {
3365                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3366                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3367                                         "clear_n_write from receive_uuids",
3368                                         BM_LOCKED_TEST_ALLOWED);
3369                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3370                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3371                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3372                                         CS_VERBOSE, NULL);
3373                         drbd_md_sync(mdev);
3374                         updated_uuids = 1;
3375                 }
3376                 put_ldev(mdev);
3377         } else if (mdev->state.disk < D_INCONSISTENT &&
3378                    mdev->state.role == R_PRIMARY) {
3379                 /* I am a diskless primary, the peer just created a new current UUID
3380                    for me. */
3381                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3382         }
3383
3384         /* Before we test for the disk state, we should wait until an eventually
3385            ongoing cluster wide state change is finished. That is important if
3386            we are primary and are detaching from our disk. We need to see the
3387            new disk state... */
3388         mutex_lock(mdev->state_mutex);
3389         mutex_unlock(mdev->state_mutex);
3390         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3391                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3392
3393         if (updated_uuids)
3394                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3395
3396         return 0;
3397 }
3398
3399 /**
3400  * convert_state() - Converts the peer's view of the cluster state to our point of view
3401  * @ps:         The state as seen by the peer.
3402  */
3403 static union drbd_state convert_state(union drbd_state ps)
3404 {
3405         union drbd_state ms;
3406
3407         static enum drbd_conns c_tab[] = {
3408                 [C_CONNECTED] = C_CONNECTED,
3409
3410                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3411                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3412                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3413                 [C_VERIFY_S]       = C_VERIFY_T,
3414                 [C_MASK]   = C_MASK,
3415         };
3416
3417         ms.i = ps.i;
3418
3419         ms.conn = c_tab[ps.conn];
3420         ms.peer = ps.role;
3421         ms.role = ps.peer;
3422         ms.pdsk = ps.disk;
3423         ms.disk = ps.pdsk;
3424         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3425
3426         return ms;
3427 }
3428
3429 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3430 {
3431         struct drbd_conf *mdev;
3432         struct p_req_state *p = pi->data;
3433         union drbd_state mask, val;
3434         enum drbd_state_rv rv;
3435
3436         mdev = vnr_to_mdev(tconn, pi->vnr);
3437         if (!mdev)
3438                 return -EIO;
3439
3440         mask.i = be32_to_cpu(p->mask);
3441         val.i = be32_to_cpu(p->val);
3442
3443         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3444             mutex_is_locked(mdev->state_mutex)) {
3445                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3446                 return 0;
3447         }
3448
3449         mask = convert_state(mask);
3450         val = convert_state(val);
3451
3452         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3453         drbd_send_sr_reply(mdev, rv);
3454
3455         drbd_md_sync(mdev);
3456
3457         return 0;
3458 }
3459
3460 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3461 {
3462         struct p_req_state *p = pi->data;
3463         union drbd_state mask, val;
3464         enum drbd_state_rv rv;
3465
3466         mask.i = be32_to_cpu(p->mask);
3467         val.i = be32_to_cpu(p->val);
3468
3469         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3470             mutex_is_locked(&tconn->cstate_mutex)) {
3471                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3472                 return 0;
3473         }
3474
3475         mask = convert_state(mask);
3476         val = convert_state(val);
3477
3478         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3479         conn_send_sr_reply(tconn, rv);
3480
3481         return 0;
3482 }
3483
3484 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3485 {
3486         struct drbd_conf *mdev;
3487         struct p_state *p = pi->data;
3488         union drbd_state os, ns, peer_state;
3489         enum drbd_disk_state real_peer_disk;
3490         enum chg_state_flags cs_flags;
3491         int rv;
3492
3493         mdev = vnr_to_mdev(tconn, pi->vnr);
3494         if (!mdev)
3495                 return config_unknown_volume(tconn, pi);
3496
3497         peer_state.i = be32_to_cpu(p->state);
3498
3499         real_peer_disk = peer_state.disk;
3500         if (peer_state.disk == D_NEGOTIATING) {
3501                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3502                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3503         }
3504
3505         spin_lock_irq(&mdev->tconn->req_lock);
3506  retry:
3507         os = ns = drbd_read_state(mdev);
3508         spin_unlock_irq(&mdev->tconn->req_lock);
3509
3510         /* peer says his disk is uptodate, while we think it is inconsistent,
3511          * and this happens while we think we have a sync going on. */
3512         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3513             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3514                 /* If we are (becoming) SyncSource, but peer is still in sync
3515                  * preparation, ignore its uptodate-ness to avoid flapping, it
3516                  * will change to inconsistent once the peer reaches active
3517                  * syncing states.
3518                  * It may have changed syncer-paused flags, however, so we
3519                  * cannot ignore this completely. */
3520                 if (peer_state.conn > C_CONNECTED &&
3521                     peer_state.conn < C_SYNC_SOURCE)
3522                         real_peer_disk = D_INCONSISTENT;
3523
3524                 /* if peer_state changes to connected at the same time,
3525                  * it explicitly notifies us that it finished resync.
3526                  * Maybe we should finish it up, too? */
3527                 else if (os.conn >= C_SYNC_SOURCE &&
3528                          peer_state.conn == C_CONNECTED) {
3529                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3530                                 drbd_resync_finished(mdev);
3531                         return 0;
3532                 }
3533         }
3534
3535         /* peer says his disk is inconsistent, while we think it is uptodate,
3536          * and this happens while the peer still thinks we have a sync going on,
3537          * but we think we are already done with the sync.
3538          * We ignore this to avoid flapping pdsk.
3539          * This should not happen, if the peer is a recent version of drbd. */
3540         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3541             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3542                 real_peer_disk = D_UP_TO_DATE;
3543
3544         if (ns.conn == C_WF_REPORT_PARAMS)
3545                 ns.conn = C_CONNECTED;
3546
3547         if (peer_state.conn == C_AHEAD)
3548                 ns.conn = C_BEHIND;
3549
3550         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3551             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3552                 int cr; /* consider resync */
3553
3554                 /* if we established a new connection */
3555                 cr  = (os.conn < C_CONNECTED);
3556                 /* if we had an established connection
3557                  * and one of the nodes newly attaches a disk */
3558                 cr |= (os.conn == C_CONNECTED &&
3559                        (peer_state.disk == D_NEGOTIATING ||
3560                         os.disk == D_NEGOTIATING));
3561                 /* if we have both been inconsistent, and the peer has been
3562                  * forced to be UpToDate with --overwrite-data */
3563                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3564                 /* if we had been plain connected, and the admin requested to
3565                  * start a sync by "invalidate" or "invalidate-remote" */
3566                 cr |= (os.conn == C_CONNECTED &&
3567                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3568                                  peer_state.conn <= C_WF_BITMAP_T));
3569
3570                 if (cr)
3571                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3572
3573                 put_ldev(mdev);
3574                 if (ns.conn == C_MASK) {
3575                         ns.conn = C_CONNECTED;
3576                         if (mdev->state.disk == D_NEGOTIATING) {
3577                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3578                         } else if (peer_state.disk == D_NEGOTIATING) {
3579                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3580                                 peer_state.disk = D_DISKLESS;
3581                                 real_peer_disk = D_DISKLESS;
3582                         } else {
3583                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3584                                         return -EIO;
3585                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3586                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3587                                 return -EIO;
3588                         }
3589                 }
3590         }
3591
3592         spin_lock_irq(&mdev->tconn->req_lock);
3593         if (os.i != drbd_read_state(mdev).i)
3594                 goto retry;
3595         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3596         ns.peer = peer_state.role;
3597         ns.pdsk = real_peer_disk;
3598         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3599         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3600                 ns.disk = mdev->new_state_tmp.disk;
3601         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3602         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3603             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3604                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3605                    for temporal network outages! */
3606                 spin_unlock_irq(&mdev->tconn->req_lock);
3607                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3608                 tl_clear(mdev->tconn);
3609                 drbd_uuid_new_current(mdev);
3610                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3611                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3612                 return -EIO;
3613         }
3614         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3615         ns = drbd_read_state(mdev);
3616         spin_unlock_irq(&mdev->tconn->req_lock);
3617
3618         if (rv < SS_SUCCESS) {
3619                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3620                 return -EIO;
3621         }
3622
3623         if (os.conn > C_WF_REPORT_PARAMS) {
3624                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3625                     peer_state.disk != D_NEGOTIATING ) {
3626                         /* we want resync, peer has not yet decided to sync... */
3627                         /* Nowadays only used when forcing a node into primary role and
3628                            setting its disk to UpToDate with that */
3629                         drbd_send_uuids(mdev);
3630                         drbd_send_state(mdev);
3631                 }
3632         }
3633
3634         mdev->tconn->net_conf->want_lose = 0;
3635
3636         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3637
3638         return 0;
3639 }
3640
3641 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3642 {
3643         struct drbd_conf *mdev;
3644         struct p_rs_uuid *p = pi->data;
3645
3646         mdev = vnr_to_mdev(tconn, pi->vnr);
3647         if (!mdev)
3648                 return -EIO;
3649
3650         wait_event(mdev->misc_wait,
3651                    mdev->state.conn == C_WF_SYNC_UUID ||
3652                    mdev->state.conn == C_BEHIND ||
3653                    mdev->state.conn < C_CONNECTED ||
3654                    mdev->state.disk < D_NEGOTIATING);
3655
3656         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3657
3658         /* Here the _drbd_uuid_ functions are right, current should
3659            _not_ be rotated into the history */
3660         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3661                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3662                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3663
3664                 drbd_print_uuids(mdev, "updated sync uuid");
3665                 drbd_start_resync(mdev, C_SYNC_TARGET);
3666
3667                 put_ldev(mdev);
3668         } else
3669                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3670
3671         return 0;
3672 }
3673
3674 /**
3675  * receive_bitmap_plain
3676  *
3677  * Return 0 when done, 1 when another iteration is needed, and a negative error
3678  * code upon failure.
3679  */
3680 static int
3681 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3682                      unsigned long *p, struct bm_xfer_ctx *c)
3683 {
3684         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3685                                  drbd_header_size(mdev->tconn);
3686         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3687                                        c->bm_words - c->word_offset);
3688         unsigned int want = num_words * sizeof(*p);
3689         int err;
3690
3691         if (want != size) {
3692                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3693                 return -EIO;
3694         }
3695         if (want == 0)
3696                 return 0;
3697         err = drbd_recv_all(mdev->tconn, p, want);
3698         if (err)
3699                 return err;
3700
3701         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3702
3703         c->word_offset += num_words;
3704         c->bit_offset = c->word_offset * BITS_PER_LONG;
3705         if (c->bit_offset > c->bm_bits)
3706                 c->bit_offset = c->bm_bits;
3707
3708         return 1;
3709 }
3710
3711 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3712 {
3713         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3714 }
3715
3716 static int dcbp_get_start(struct p_compressed_bm *p)
3717 {
3718         return (p->encoding & 0x80) != 0;
3719 }
3720
3721 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3722 {
3723         return (p->encoding >> 4) & 0x7;
3724 }
3725
3726 /**
3727  * recv_bm_rle_bits
3728  *
3729  * Return 0 when done, 1 when another iteration is needed, and a negative error
3730  * code upon failure.
3731  */
3732 static int
3733 recv_bm_rle_bits(struct drbd_conf *mdev,
3734                 struct p_compressed_bm *p,
3735                  struct bm_xfer_ctx *c,
3736                  unsigned int len)
3737 {
3738         struct bitstream bs;
3739         u64 look_ahead;
3740         u64 rl;
3741         u64 tmp;
3742         unsigned long s = c->bit_offset;
3743         unsigned long e;
3744         int toggle = dcbp_get_start(p);
3745         int have;
3746         int bits;
3747
3748         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3749
3750         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3751         if (bits < 0)
3752                 return -EIO;
3753
3754         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3755                 bits = vli_decode_bits(&rl, look_ahead);
3756                 if (bits <= 0)
3757                         return -EIO;
3758
3759                 if (toggle) {
3760                         e = s + rl -1;
3761                         if (e >= c->bm_bits) {
3762                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3763                                 return -EIO;
3764                         }
3765                         _drbd_bm_set_bits(mdev, s, e);
3766                 }
3767
3768                 if (have < bits) {
3769                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3770                                 have, bits, look_ahead,
3771                                 (unsigned int)(bs.cur.b - p->code),
3772                                 (unsigned int)bs.buf_len);
3773                         return -EIO;
3774                 }
3775                 look_ahead >>= bits;
3776                 have -= bits;
3777
3778                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3779                 if (bits < 0)
3780                         return -EIO;
3781                 look_ahead |= tmp << have;
3782                 have += bits;
3783         }
3784
3785         c->bit_offset = s;
3786         bm_xfer_ctx_bit_to_word_offset(c);
3787
3788         return (s != c->bm_bits);
3789 }
3790
3791 /**
3792  * decode_bitmap_c
3793  *
3794  * Return 0 when done, 1 when another iteration is needed, and a negative error
3795  * code upon failure.
3796  */
3797 static int
3798 decode_bitmap_c(struct drbd_conf *mdev,
3799                 struct p_compressed_bm *p,
3800                 struct bm_xfer_ctx *c,
3801                 unsigned int len)
3802 {
3803         if (dcbp_get_code(p) == RLE_VLI_Bits)
3804                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
3805
3806         /* other variants had been implemented for evaluation,
3807          * but have been dropped as this one turned out to be "best"
3808          * during all our tests. */
3809
3810         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3811         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3812         return -EIO;
3813 }
3814
3815 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3816                 const char *direction, struct bm_xfer_ctx *c)
3817 {
3818         /* what would it take to transfer it "plaintext" */
3819         unsigned int header_size = drbd_header_size(mdev->tconn);
3820         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3821         unsigned int plain =
3822                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3823                 c->bm_words * sizeof(unsigned long);
3824         unsigned int total = c->bytes[0] + c->bytes[1];
3825         unsigned int r;
3826
3827         /* total can not be zero. but just in case: */
3828         if (total == 0)
3829                 return;
3830
3831         /* don't report if not compressed */
3832         if (total >= plain)
3833                 return;
3834
3835         /* total < plain. check for overflow, still */
3836         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3837                                     : (1000 * total / plain);
3838
3839         if (r > 1000)
3840                 r = 1000;
3841
3842         r = 1000 - r;
3843         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3844              "total %u; compression: %u.%u%%\n",
3845                         direction,
3846                         c->bytes[1], c->packets[1],
3847                         c->bytes[0], c->packets[0],
3848                         total, r/10, r % 10);
3849 }
3850
3851 /* Since we are processing the bitfield from lower addresses to higher,
3852    it does not matter if the process it in 32 bit chunks or 64 bit
3853    chunks as long as it is little endian. (Understand it as byte stream,
3854    beginning with the lowest byte...) If we would use big endian
3855    we would need to process it from the highest address to the lowest,
3856    in order to be agnostic to the 32 vs 64 bits issue.
3857
3858    returns 0 on failure, 1 if we successfully received it. */
3859 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3860 {
3861         struct drbd_conf *mdev;
3862         struct bm_xfer_ctx c;
3863         int err;
3864
3865         mdev = vnr_to_mdev(tconn, pi->vnr);
3866         if (!mdev)
3867                 return -EIO;
3868
3869         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3870         /* you are supposed to send additional out-of-sync information
3871          * if you actually set bits during this phase */
3872
3873         c = (struct bm_xfer_ctx) {
3874                 .bm_bits = drbd_bm_bits(mdev),
3875                 .bm_words = drbd_bm_words(mdev),
3876         };
3877
3878         for(;;) {
3879                 if (pi->cmd == P_BITMAP)
3880                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
3881                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
3882                         /* MAYBE: sanity check that we speak proto >= 90,
3883                          * and the feature is enabled! */
3884                         struct p_compressed_bm *p = pi->data;
3885
3886                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
3887                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3888                                 err = -EIO;
3889                                 goto out;
3890                         }
3891                         if (pi->size <= sizeof(*p)) {
3892                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
3893                                 err = -EIO;
3894                                 goto out;
3895                         }
3896                         err = drbd_recv_all(mdev->tconn, p, pi->size);
3897                         if (err)
3898                                goto out;
3899                         err = decode_bitmap_c(mdev, p, &c, pi->size);
3900                 } else {
3901                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
3902                         err = -EIO;
3903                         goto out;
3904                 }
3905
3906                 c.packets[pi->cmd == P_BITMAP]++;
3907                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
3908
3909                 if (err <= 0) {
3910                         if (err < 0)
3911                                 goto out;
3912                         break;
3913                 }
3914                 err = drbd_recv_header(mdev->tconn, pi);
3915                 if (err)
3916                         goto out;
3917         }
3918
3919         INFO_bm_xfer_stats(mdev, "receive", &c);
3920
3921         if (mdev->state.conn == C_WF_BITMAP_T) {
3922                 enum drbd_state_rv rv;
3923
3924                 err = drbd_send_bitmap(mdev);
3925                 if (err)
3926                         goto out;
3927                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3928                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3929                 D_ASSERT(rv == SS_SUCCESS);
3930         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3931                 /* admin may have requested C_DISCONNECTING,
3932                  * other threads may have noticed network errors */
3933                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3934                     drbd_conn_str(mdev->state.conn));
3935         }
3936         err = 0;
3937
3938  out:
3939         drbd_bm_unlock(mdev);
3940         if (!err && mdev->state.conn == C_WF_BITMAP_S)
3941                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3942         return err;
3943 }
3944
3945 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
3946 {
3947         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
3948                  pi->cmd, pi->size);
3949
3950         return ignore_remaining_packet(tconn, pi);
3951 }
3952
3953 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
3954 {
3955         /* Make sure we've acked all the TCP data associated
3956          * with the data requests being unplugged */
3957         drbd_tcp_quickack(tconn->data.socket);
3958
3959         return 0;
3960 }
3961
3962 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
3963 {
3964         struct drbd_conf *mdev;
3965         struct p_block_desc *p = pi->data;
3966
3967         mdev = vnr_to_mdev(tconn, pi->vnr);
3968         if (!mdev)
3969                 return -EIO;
3970
3971         switch (mdev->state.conn) {
3972         case C_WF_SYNC_UUID:
3973         case C_WF_BITMAP_T:
3974         case C_BEHIND:
3975                         break;
3976         default:
3977                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3978                                 drbd_conn_str(mdev->state.conn));
3979         }
3980
3981         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3982
3983         return 0;
3984 }
3985
3986 struct data_cmd {
3987         int expect_payload;
3988         size_t pkt_size;
3989         int (*fn)(struct drbd_tconn *, struct packet_info *);
3990 };
3991
3992 static struct data_cmd drbd_cmd_handler[] = {
3993         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
3994         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
3995         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3996         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3997         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
3998         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
3999         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4000         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4001         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4002         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4003         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4004         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4005         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4006         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4007         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4008         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4009         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4010         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4011         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4012         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4013         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4014         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4015         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4016 };
4017
4018 static void drbdd(struct drbd_tconn *tconn)
4019 {
4020         struct packet_info pi;
4021         size_t shs; /* sub header size */
4022         int err;
4023
4024         while (get_t_state(&tconn->receiver) == RUNNING) {
4025                 struct data_cmd *cmd;
4026
4027                 drbd_thread_current_set_cpu(&tconn->receiver);
4028                 if (drbd_recv_header(tconn, &pi))
4029                         goto err_out;
4030
4031                 cmd = &drbd_cmd_handler[pi.cmd];
4032                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4033                         conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4034                         goto err_out;
4035                 }
4036
4037                 shs = cmd->pkt_size;
4038                 if (pi.size > shs && !cmd->expect_payload) {
4039                         conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4040                         goto err_out;
4041                 }
4042
4043                 if (shs) {
4044                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4045                         if (err)
4046                                 goto err_out;
4047                         pi.size -= shs;
4048                 }
4049
4050                 err = cmd->fn(tconn, &pi);
4051                 if (err) {
4052                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4053                                  cmdname(pi.cmd), err, pi.size);
4054                         goto err_out;
4055                 }
4056         }
4057         return;
4058
4059     err_out:
4060         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4061 }
4062
4063 void conn_flush_workqueue(struct drbd_tconn *tconn)
4064 {
4065         struct drbd_wq_barrier barr;
4066
4067         barr.w.cb = w_prev_work_done;
4068         barr.w.tconn = tconn;
4069         init_completion(&barr.done);
4070         drbd_queue_work(&tconn->data.work, &barr.w);
4071         wait_for_completion(&barr.done);
4072 }
4073
4074 static void drbd_disconnect(struct drbd_tconn *tconn)
4075 {
4076         enum drbd_conns oc;
4077         int rv = SS_UNKNOWN_ERROR;
4078
4079         if (tconn->cstate == C_STANDALONE)
4080                 return;
4081
4082         /* asender does not clean up anything. it must not interfere, either */
4083         drbd_thread_stop(&tconn->asender);
4084         drbd_free_sock(tconn);
4085
4086         idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4087         conn_info(tconn, "Connection closed\n");
4088
4089         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4090                 conn_try_outdate_peer_async(tconn);
4091
4092         spin_lock_irq(&tconn->req_lock);
4093         oc = tconn->cstate;
4094         if (oc >= C_UNCONNECTED)
4095                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4096
4097         spin_unlock_irq(&tconn->req_lock);
4098
4099         if (oc == C_DISCONNECTING) {
4100                 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4101
4102                 crypto_free_hash(tconn->cram_hmac_tfm);
4103                 tconn->cram_hmac_tfm = NULL;
4104
4105                 kfree(tconn->net_conf);
4106                 tconn->net_conf = NULL;
4107                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
4108         }
4109 }
4110
4111 static int drbd_disconnected(int vnr, void *p, void *data)
4112 {
4113         struct drbd_conf *mdev = (struct drbd_conf *)p;
4114         enum drbd_fencing_p fp;
4115         unsigned int i;
4116
4117         /* wait for current activity to cease. */
4118         spin_lock_irq(&mdev->tconn->req_lock);
4119         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4120         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4121         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4122         spin_unlock_irq(&mdev->tconn->req_lock);
4123
4124         /* We do not have data structures that would allow us to
4125          * get the rs_pending_cnt down to 0 again.
4126          *  * On C_SYNC_TARGET we do not have any data structures describing
4127          *    the pending RSDataRequest's we have sent.
4128          *  * On C_SYNC_SOURCE there is no data structure that tracks
4129          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4130          *  And no, it is not the sum of the reference counts in the
4131          *  resync_LRU. The resync_LRU tracks the whole operation including
4132          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4133          *  on the fly. */
4134         drbd_rs_cancel_all(mdev);
4135         mdev->rs_total = 0;
4136         mdev->rs_failed = 0;
4137         atomic_set(&mdev->rs_pending_cnt, 0);
4138         wake_up(&mdev->misc_wait);
4139
4140         del_timer(&mdev->request_timer);
4141
4142         del_timer_sync(&mdev->resync_timer);
4143         resync_timer_fn((unsigned long)mdev);
4144
4145         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4146          * w_make_resync_request etc. which may still be on the worker queue
4147          * to be "canceled" */
4148         drbd_flush_workqueue(mdev);
4149
4150         /* This also does reclaim_net_ee().  If we do this too early, we might
4151          * miss some resync ee and pages.*/
4152         drbd_process_done_ee(mdev);
4153
4154         kfree(mdev->p_uuid);
4155         mdev->p_uuid = NULL;
4156
4157         if (!drbd_suspended(mdev))
4158                 tl_clear(mdev->tconn);
4159
4160         drbd_md_sync(mdev);
4161
4162         fp = FP_DONT_CARE;
4163         if (get_ldev(mdev)) {
4164                 fp = mdev->ldev->dc.fencing;
4165                 put_ldev(mdev);
4166         }
4167
4168         /* serialize with bitmap writeout triggered by the state change,
4169          * if any. */
4170         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4171
4172         /* tcp_close and release of sendpage pages can be deferred.  I don't
4173          * want to use SO_LINGER, because apparently it can be deferred for
4174          * more than 20 seconds (longest time I checked).
4175          *
4176          * Actually we don't care for exactly when the network stack does its
4177          * put_page(), but release our reference on these pages right here.
4178          */
4179         i = drbd_release_ee(mdev, &mdev->net_ee);
4180         if (i)
4181                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4182         i = atomic_read(&mdev->pp_in_use_by_net);
4183         if (i)
4184                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4185         i = atomic_read(&mdev->pp_in_use);
4186         if (i)
4187                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4188
4189         D_ASSERT(list_empty(&mdev->read_ee));
4190         D_ASSERT(list_empty(&mdev->active_ee));
4191         D_ASSERT(list_empty(&mdev->sync_ee));
4192         D_ASSERT(list_empty(&mdev->done_ee));
4193
4194         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4195         atomic_set(&mdev->current_epoch->epoch_size, 0);
4196         D_ASSERT(list_empty(&mdev->current_epoch->list));
4197
4198         return 0;
4199 }
4200
4201 /*
4202  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4203  * we can agree on is stored in agreed_pro_version.
4204  *
4205  * feature flags and the reserved array should be enough room for future
4206  * enhancements of the handshake protocol, and possible plugins...
4207  *
4208  * for now, they are expected to be zero, but ignored.
4209  */
4210 static int drbd_send_features(struct drbd_tconn *tconn)
4211 {
4212         struct drbd_socket *sock;
4213         struct p_connection_features *p;
4214
4215         sock = &tconn->data;
4216         p = conn_prepare_command(tconn, sock);
4217         if (!p)
4218                 return -EIO;
4219         memset(p, 0, sizeof(*p));
4220         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4221         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4222         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4223 }
4224
4225 /*
4226  * return values:
4227  *   1 yes, we have a valid connection
4228  *   0 oops, did not work out, please try again
4229  *  -1 peer talks different language,
4230  *     no point in trying again, please go standalone.
4231  */
4232 static int drbd_do_features(struct drbd_tconn *tconn)
4233 {
4234         /* ASSERT current == tconn->receiver ... */
4235         struct p_connection_features *p;
4236         const int expect = sizeof(struct p_connection_features);
4237         struct packet_info pi;
4238         int err;
4239
4240         err = drbd_send_features(tconn);
4241         if (err)
4242                 return 0;
4243
4244         err = drbd_recv_header(tconn, &pi);
4245         if (err)
4246                 return 0;
4247
4248         if (pi.cmd != P_CONNECTION_FEATURES) {
4249                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4250                      cmdname(pi.cmd), pi.cmd);
4251                 return -1;
4252         }
4253
4254         if (pi.size != expect) {
4255                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4256                      expect, pi.size);
4257                 return -1;
4258         }
4259
4260         p = pi.data;
4261         err = drbd_recv_all_warn(tconn, p, expect);
4262         if (err)
4263                 return 0;
4264
4265         p->protocol_min = be32_to_cpu(p->protocol_min);
4266         p->protocol_max = be32_to_cpu(p->protocol_max);
4267         if (p->protocol_max == 0)
4268                 p->protocol_max = p->protocol_min;
4269
4270         if (PRO_VERSION_MAX < p->protocol_min ||
4271             PRO_VERSION_MIN > p->protocol_max)
4272                 goto incompat;
4273
4274         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4275
4276         conn_info(tconn, "Handshake successful: "
4277              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4278
4279         return 1;
4280
4281  incompat:
4282         conn_err(tconn, "incompatible DRBD dialects: "
4283             "I support %d-%d, peer supports %d-%d\n",
4284             PRO_VERSION_MIN, PRO_VERSION_MAX,
4285             p->protocol_min, p->protocol_max);
4286         return -1;
4287 }
4288
4289 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4290 static int drbd_do_auth(struct drbd_tconn *tconn)
4291 {
4292         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4293         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4294         return -1;
4295 }
4296 #else
4297 #define CHALLENGE_LEN 64
4298
4299 /* Return value:
4300         1 - auth succeeded,
4301         0 - failed, try again (network error),
4302         -1 - auth failed, don't try again.
4303 */
4304
4305 static int drbd_do_auth(struct drbd_tconn *tconn)
4306 {
4307         struct drbd_socket *sock;
4308         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4309         struct scatterlist sg;
4310         char *response = NULL;
4311         char *right_response = NULL;
4312         char *peers_ch = NULL;
4313         unsigned int key_len = strlen(tconn->net_conf->shared_secret);
4314         unsigned int resp_size;
4315         struct hash_desc desc;
4316         struct packet_info pi;
4317         int err, rv;
4318
4319         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4320
4321         desc.tfm = tconn->cram_hmac_tfm;
4322         desc.flags = 0;
4323
4324         rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4325                                 (u8 *)tconn->net_conf->shared_secret, key_len);
4326         if (rv) {
4327                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4328                 rv = -1;
4329                 goto fail;
4330         }
4331
4332         get_random_bytes(my_challenge, CHALLENGE_LEN);
4333
4334         sock = &tconn->data;
4335         if (!conn_prepare_command(tconn, sock)) {
4336                 rv = 0;
4337                 goto fail;
4338         }
4339         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4340                                 my_challenge, CHALLENGE_LEN);
4341         if (!rv)
4342                 goto fail;
4343
4344         err = drbd_recv_header(tconn, &pi);
4345         if (err) {
4346                 rv = 0;
4347                 goto fail;
4348         }
4349
4350         if (pi.cmd != P_AUTH_CHALLENGE) {
4351                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4352                     cmdname(pi.cmd), pi.cmd);
4353                 rv = 0;
4354                 goto fail;
4355         }
4356
4357         if (pi.size > CHALLENGE_LEN * 2) {
4358                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4359                 rv = -1;
4360                 goto fail;
4361         }
4362
4363         peers_ch = kmalloc(pi.size, GFP_NOIO);
4364         if (peers_ch == NULL) {
4365                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4366                 rv = -1;
4367                 goto fail;
4368         }
4369
4370         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4371         if (err) {
4372                 rv = 0;
4373                 goto fail;
4374         }
4375
4376         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4377         response = kmalloc(resp_size, GFP_NOIO);
4378         if (response == NULL) {
4379                 conn_err(tconn, "kmalloc of response failed\n");
4380                 rv = -1;
4381                 goto fail;
4382         }
4383
4384         sg_init_table(&sg, 1);
4385         sg_set_buf(&sg, peers_ch, pi.size);
4386
4387         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4388         if (rv) {
4389                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4390                 rv = -1;
4391                 goto fail;
4392         }
4393
4394         if (!conn_prepare_command(tconn, sock)) {
4395                 rv = 0;
4396                 goto fail;
4397         }
4398         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4399                                 response, resp_size);
4400         if (!rv)
4401                 goto fail;
4402
4403         err = drbd_recv_header(tconn, &pi);
4404         if (err) {
4405                 rv = 0;
4406                 goto fail;
4407         }
4408
4409         if (pi.cmd != P_AUTH_RESPONSE) {
4410                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4411                         cmdname(pi.cmd), pi.cmd);
4412                 rv = 0;
4413                 goto fail;
4414         }
4415
4416         if (pi.size != resp_size) {
4417                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4418                 rv = 0;
4419                 goto fail;
4420         }
4421
4422         err = drbd_recv_all_warn(tconn, response , resp_size);
4423         if (err) {
4424                 rv = 0;
4425                 goto fail;
4426         }
4427
4428         right_response = kmalloc(resp_size, GFP_NOIO);
4429         if (right_response == NULL) {
4430                 conn_err(tconn, "kmalloc of right_response failed\n");
4431                 rv = -1;
4432                 goto fail;
4433         }
4434
4435         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4436
4437         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4438         if (rv) {
4439                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4440                 rv = -1;
4441                 goto fail;
4442         }
4443
4444         rv = !memcmp(response, right_response, resp_size);
4445
4446         if (rv)
4447                 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4448                      resp_size, tconn->net_conf->cram_hmac_alg);
4449         else
4450                 rv = -1;
4451
4452  fail:
4453         kfree(peers_ch);
4454         kfree(response);
4455         kfree(right_response);
4456
4457         return rv;
4458 }
4459 #endif
4460
4461 int drbdd_init(struct drbd_thread *thi)
4462 {
4463         struct drbd_tconn *tconn = thi->tconn;
4464         int h;
4465
4466         conn_info(tconn, "receiver (re)started\n");
4467
4468         do {
4469                 h = drbd_connect(tconn);
4470                 if (h == 0) {
4471                         drbd_disconnect(tconn);
4472                         schedule_timeout_interruptible(HZ);
4473                 }
4474                 if (h == -1) {
4475                         conn_warn(tconn, "Discarding network configuration.\n");
4476                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4477                 }
4478         } while (h == 0);
4479
4480         if (h > 0) {
4481                 if (get_net_conf(tconn)) {
4482                         drbdd(tconn);
4483                         put_net_conf(tconn);
4484                 }
4485         }
4486
4487         drbd_disconnect(tconn);
4488
4489         conn_info(tconn, "receiver terminated\n");
4490         return 0;
4491 }
4492
4493 /* ********* acknowledge sender ******** */
4494
4495 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4496 {
4497         struct p_req_state_reply *p = pi->data;
4498         int retcode = be32_to_cpu(p->retcode);
4499
4500         if (retcode >= SS_SUCCESS) {
4501                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4502         } else {
4503                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4504                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4505                          drbd_set_st_err_str(retcode), retcode);
4506         }
4507         wake_up(&tconn->ping_wait);
4508
4509         return true;
4510 }
4511
4512 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4513 {
4514         struct drbd_conf *mdev;
4515         struct p_req_state_reply *p = pi->data;
4516         int retcode = be32_to_cpu(p->retcode);
4517
4518         mdev = vnr_to_mdev(tconn, pi->vnr);
4519         if (!mdev)
4520                 return false;
4521
4522         if (retcode >= SS_SUCCESS) {
4523                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4524         } else {
4525                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4526                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4527                         drbd_set_st_err_str(retcode), retcode);
4528         }
4529         wake_up(&mdev->state_wait);
4530
4531         return true;
4532 }
4533
4534 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4535 {
4536         return !drbd_send_ping_ack(tconn);
4537
4538 }
4539
4540 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4541 {
4542         /* restore idle timeout */
4543         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4544         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4545                 wake_up(&tconn->ping_wait);
4546
4547         return true;
4548 }
4549
4550 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4551 {
4552         struct drbd_conf *mdev;
4553         struct p_block_ack *p = pi->data;
4554         sector_t sector = be64_to_cpu(p->sector);
4555         int blksize = be32_to_cpu(p->blksize);
4556
4557         mdev = vnr_to_mdev(tconn, pi->vnr);
4558         if (!mdev)
4559                 return false;
4560
4561         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4562
4563         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4564
4565         if (get_ldev(mdev)) {
4566                 drbd_rs_complete_io(mdev, sector);
4567                 drbd_set_in_sync(mdev, sector, blksize);
4568                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4569                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4570                 put_ldev(mdev);
4571         }
4572         dec_rs_pending(mdev);
4573         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4574
4575         return true;
4576 }
4577
4578 static int
4579 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4580                               struct rb_root *root, const char *func,
4581                               enum drbd_req_event what, bool missing_ok)
4582 {
4583         struct drbd_request *req;
4584         struct bio_and_error m;
4585
4586         spin_lock_irq(&mdev->tconn->req_lock);
4587         req = find_request(mdev, root, id, sector, missing_ok, func);
4588         if (unlikely(!req)) {
4589                 spin_unlock_irq(&mdev->tconn->req_lock);
4590                 return -EIO;
4591         }
4592         __req_mod(req, what, &m);
4593         spin_unlock_irq(&mdev->tconn->req_lock);
4594
4595         if (m.bio)
4596                 complete_master_bio(mdev, &m);
4597         return 0;
4598 }
4599
4600 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4601 {
4602         struct drbd_conf *mdev;
4603         struct p_block_ack *p = pi->data;
4604         sector_t sector = be64_to_cpu(p->sector);
4605         int blksize = be32_to_cpu(p->blksize);
4606         enum drbd_req_event what;
4607
4608         mdev = vnr_to_mdev(tconn, pi->vnr);
4609         if (!mdev)
4610                 return false;
4611
4612         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4613
4614         if (p->block_id == ID_SYNCER) {
4615                 drbd_set_in_sync(mdev, sector, blksize);
4616                 dec_rs_pending(mdev);
4617                 return true;
4618         }
4619         switch (pi->cmd) {
4620         case P_RS_WRITE_ACK:
4621                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4622                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4623                 break;
4624         case P_WRITE_ACK:
4625                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4626                 what = WRITE_ACKED_BY_PEER;
4627                 break;
4628         case P_RECV_ACK:
4629                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
4630                 what = RECV_ACKED_BY_PEER;
4631                 break;
4632         case P_DISCARD_WRITE:
4633                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4634                 what = DISCARD_WRITE;
4635                 break;
4636         case P_RETRY_WRITE:
4637                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4638                 what = POSTPONE_WRITE;
4639                 break;
4640         default:
4641                 D_ASSERT(0);
4642                 return false;
4643         }
4644
4645         return !validate_req_change_req_state(mdev, p->block_id, sector,
4646                                               &mdev->write_requests, __func__,
4647                                               what, false);
4648 }
4649
4650 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4651 {
4652         struct drbd_conf *mdev;
4653         struct p_block_ack *p = pi->data;
4654         sector_t sector = be64_to_cpu(p->sector);
4655         int size = be32_to_cpu(p->blksize);
4656         bool missing_ok = tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4657                           tconn->net_conf->wire_protocol == DRBD_PROT_B;
4658         int err;
4659
4660         mdev = vnr_to_mdev(tconn, pi->vnr);
4661         if (!mdev)
4662                 return false;
4663
4664         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4665
4666         if (p->block_id == ID_SYNCER) {
4667                 dec_rs_pending(mdev);
4668                 drbd_rs_failed_io(mdev, sector, size);
4669                 return true;
4670         }
4671
4672         err = validate_req_change_req_state(mdev, p->block_id, sector,
4673                                             &mdev->write_requests, __func__,
4674                                             NEG_ACKED, missing_ok);
4675         if (err) {
4676                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4677                    The master bio might already be completed, therefore the
4678                    request is no longer in the collision hash. */
4679                 /* In Protocol B we might already have got a P_RECV_ACK
4680                    but then get a P_NEG_ACK afterwards. */
4681                 if (!missing_ok)
4682                         return false;
4683                 drbd_set_out_of_sync(mdev, sector, size);
4684         }
4685         return true;
4686 }
4687
4688 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4689 {
4690         struct drbd_conf *mdev;
4691         struct p_block_ack *p = pi->data;
4692         sector_t sector = be64_to_cpu(p->sector);
4693
4694         mdev = vnr_to_mdev(tconn, pi->vnr);
4695         if (!mdev)
4696                 return false;
4697
4698         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4699
4700         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4701             (unsigned long long)sector, be32_to_cpu(p->blksize));
4702
4703         return !validate_req_change_req_state(mdev, p->block_id, sector,
4704                                               &mdev->read_requests, __func__,
4705                                               NEG_ACKED, false);
4706 }
4707
4708 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4709 {
4710         struct drbd_conf *mdev;
4711         sector_t sector;
4712         int size;
4713         struct p_block_ack *p = pi->data;
4714
4715         mdev = vnr_to_mdev(tconn, pi->vnr);
4716         if (!mdev)
4717                 return false;
4718
4719         sector = be64_to_cpu(p->sector);
4720         size = be32_to_cpu(p->blksize);
4721
4722         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4723
4724         dec_rs_pending(mdev);
4725
4726         if (get_ldev_if_state(mdev, D_FAILED)) {
4727                 drbd_rs_complete_io(mdev, sector);
4728                 switch (pi->cmd) {
4729                 case P_NEG_RS_DREPLY:
4730                         drbd_rs_failed_io(mdev, sector, size);
4731                 case P_RS_CANCEL:
4732                         break;
4733                 default:
4734                         D_ASSERT(0);
4735                         put_ldev(mdev);
4736                         return false;
4737                 }
4738                 put_ldev(mdev);
4739         }
4740
4741         return true;
4742 }
4743
4744 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4745 {
4746         struct drbd_conf *mdev;
4747         struct p_barrier_ack *p = pi->data;
4748
4749         mdev = vnr_to_mdev(tconn, pi->vnr);
4750         if (!mdev)
4751                 return false;
4752
4753         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4754
4755         if (mdev->state.conn == C_AHEAD &&
4756             atomic_read(&mdev->ap_in_flight) == 0 &&
4757             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4758                 mdev->start_resync_timer.expires = jiffies + HZ;
4759                 add_timer(&mdev->start_resync_timer);
4760         }
4761
4762         return true;
4763 }
4764
4765 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4766 {
4767         struct drbd_conf *mdev;
4768         struct p_block_ack *p = pi->data;
4769         struct drbd_work *w;
4770         sector_t sector;
4771         int size;
4772
4773         mdev = vnr_to_mdev(tconn, pi->vnr);
4774         if (!mdev)
4775                 return false;
4776
4777         sector = be64_to_cpu(p->sector);
4778         size = be32_to_cpu(p->blksize);
4779
4780         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4781
4782         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4783                 drbd_ov_out_of_sync_found(mdev, sector, size);
4784         else
4785                 ov_out_of_sync_print(mdev);
4786
4787         if (!get_ldev(mdev))
4788                 return true;
4789
4790         drbd_rs_complete_io(mdev, sector);
4791         dec_rs_pending(mdev);
4792
4793         --mdev->ov_left;
4794
4795         /* let's advance progress step marks only for every other megabyte */
4796         if ((mdev->ov_left & 0x200) == 0x200)
4797                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4798
4799         if (mdev->ov_left == 0) {
4800                 w = kmalloc(sizeof(*w), GFP_NOIO);
4801                 if (w) {
4802                         w->cb = w_ov_finished;
4803                         w->mdev = mdev;
4804                         drbd_queue_work_front(&mdev->tconn->data.work, w);
4805                 } else {
4806                         dev_err(DEV, "kmalloc(w) failed.");
4807                         ov_out_of_sync_print(mdev);
4808                         drbd_resync_finished(mdev);
4809                 }
4810         }
4811         put_ldev(mdev);
4812         return true;
4813 }
4814
4815 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4816 {
4817         return true;
4818 }
4819
4820 static int tconn_process_done_ee(struct drbd_tconn *tconn)
4821 {
4822         struct drbd_conf *mdev;
4823         int i, not_empty = 0;
4824
4825         do {
4826                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4827                 flush_signals(current);
4828                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4829                         if (drbd_process_done_ee(mdev))
4830                                 return 1; /* error */
4831                 }
4832                 set_bit(SIGNAL_ASENDER, &tconn->flags);
4833
4834                 spin_lock_irq(&tconn->req_lock);
4835                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4836                         not_empty = !list_empty(&mdev->done_ee);
4837                         if (not_empty)
4838                                 break;
4839                 }
4840                 spin_unlock_irq(&tconn->req_lock);
4841         } while (not_empty);
4842
4843         return 0;
4844 }
4845
4846 struct asender_cmd {
4847         size_t pkt_size;
4848         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
4849 };
4850
4851 static struct asender_cmd asender_tbl[] = {
4852         [P_PING]            = { 0, got_Ping },
4853         [P_PING_ACK]        = { 0, got_PingAck },
4854         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4855         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4856         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4857         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
4858         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4859         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4860         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
4861         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4862         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4863         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4864         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4865         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4866         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
4867         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4868         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
4869 };
4870
4871 int drbd_asender(struct drbd_thread *thi)
4872 {
4873         struct drbd_tconn *tconn = thi->tconn;
4874         struct asender_cmd *cmd = NULL;
4875         struct packet_info pi;
4876         int rv;
4877         void *buf    = tconn->meta.rbuf;
4878         int received = 0;
4879         unsigned int header_size = drbd_header_size(tconn);
4880         int expect   = header_size;
4881         int ping_timeout_active = 0;
4882
4883         current->policy = SCHED_RR;  /* Make this a realtime task! */
4884         current->rt_priority = 2;    /* more important than all other tasks */
4885
4886         while (get_t_state(thi) == RUNNING) {
4887                 drbd_thread_current_set_cpu(thi);
4888                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
4889                         if (drbd_send_ping(tconn)) {
4890                                 conn_err(tconn, "drbd_send_ping has failed\n");
4891                                 goto reconnect;
4892                         }
4893                         tconn->meta.socket->sk->sk_rcvtimeo =
4894                                 tconn->net_conf->ping_timeo*HZ/10;
4895                         ping_timeout_active = 1;
4896                 }
4897
4898                 /* TODO: conditionally cork; it may hurt latency if we cork without
4899                    much to send */
4900                 if (!tconn->net_conf->no_cork)
4901                         drbd_tcp_cork(tconn->meta.socket);
4902                 if (tconn_process_done_ee(tconn)) {
4903                         conn_err(tconn, "tconn_process_done_ee() failed\n");
4904                         goto reconnect;
4905                 }
4906                 /* but unconditionally uncork unless disabled */
4907                 if (!tconn->net_conf->no_cork)
4908                         drbd_tcp_uncork(tconn->meta.socket);
4909
4910                 /* short circuit, recv_msg would return EINTR anyways. */
4911                 if (signal_pending(current))
4912                         continue;
4913
4914                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4915                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4916
4917                 flush_signals(current);
4918
4919                 /* Note:
4920                  * -EINTR        (on meta) we got a signal
4921                  * -EAGAIN       (on meta) rcvtimeo expired
4922                  * -ECONNRESET   other side closed the connection
4923                  * -ERESTARTSYS  (on data) we got a signal
4924                  * rv <  0       other than above: unexpected error!
4925                  * rv == expected: full header or command
4926                  * rv <  expected: "woken" by signal during receive
4927                  * rv == 0       : "connection shut down by peer"
4928                  */
4929                 if (likely(rv > 0)) {
4930                         received += rv;
4931                         buf      += rv;
4932                 } else if (rv == 0) {
4933                         conn_err(tconn, "meta connection shut down by peer.\n");
4934                         goto reconnect;
4935                 } else if (rv == -EAGAIN) {
4936                         /* If the data socket received something meanwhile,
4937                          * that is good enough: peer is still alive. */
4938                         if (time_after(tconn->last_received,
4939                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
4940                                 continue;
4941                         if (ping_timeout_active) {
4942                                 conn_err(tconn, "PingAck did not arrive in time.\n");
4943                                 goto reconnect;
4944                         }
4945                         set_bit(SEND_PING, &tconn->flags);
4946                         continue;
4947                 } else if (rv == -EINTR) {
4948                         continue;
4949                 } else {
4950                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
4951                         goto reconnect;
4952                 }
4953
4954                 if (received == expect && cmd == NULL) {
4955                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
4956                                 goto reconnect;
4957                         cmd = &asender_tbl[pi.cmd];
4958                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
4959                                 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
4960                                         pi.cmd, pi.size);
4961                                 goto disconnect;
4962                         }
4963                         expect = header_size + cmd->pkt_size;
4964                         if (pi.size != expect - header_size) {
4965                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
4966                                         pi.cmd, pi.size);
4967                                 goto reconnect;
4968                         }
4969                 }
4970                 if (received == expect) {
4971                         bool rv;
4972
4973                         rv = cmd->fn(tconn, &pi);
4974                         if (!rv) {
4975                                 conn_err(tconn, "%pf failed\n", cmd->fn);
4976                                 goto reconnect;
4977                         }
4978
4979                         tconn->last_received = jiffies;
4980
4981                         /* the idle_timeout (ping-int)
4982                          * has been restored in got_PingAck() */
4983                         if (cmd == &asender_tbl[P_PING_ACK])
4984                                 ping_timeout_active = 0;
4985
4986                         buf      = tconn->meta.rbuf;
4987                         received = 0;
4988                         expect   = header_size;
4989                         cmd      = NULL;
4990                 }
4991         }
4992
4993         if (0) {
4994 reconnect:
4995                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4996         }
4997         if (0) {
4998 disconnect:
4999                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5000         }
5001         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5002
5003         conn_info(tconn, "asender terminated\n");
5004
5005         return 0;
5006 }