]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
drbd: Make drbd_wait_ee_list_empty() and _drbd_wait_ee_list_empty() static
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(int vnr, void *p, void *data);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
154 {
155         struct page *page = NULL;
156         struct page *tmp = NULL;
157         int i = 0;
158
159         /* Yes, testing drbd_pp_vacant outside the lock is racy.
160          * So what. It saves a spin_lock. */
161         if (drbd_pp_vacant >= number) {
162                 spin_lock(&drbd_pp_lock);
163                 page = page_chain_del(&drbd_pp_pool, number);
164                 if (page)
165                         drbd_pp_vacant -= number;
166                 spin_unlock(&drbd_pp_lock);
167                 if (page)
168                         return page;
169         }
170
171         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
172          * "criss-cross" setup, that might cause write-out on some other DRBD,
173          * which in turn might block on the other node at this very place.  */
174         for (i = 0; i < number; i++) {
175                 tmp = alloc_page(GFP_TRY);
176                 if (!tmp)
177                         break;
178                 set_page_private(tmp, (unsigned long)page);
179                 page = tmp;
180         }
181
182         if (i == number)
183                 return page;
184
185         /* Not enough pages immediately available this time.
186          * No need to jump around here, drbd_pp_alloc will retry this
187          * function "soon". */
188         if (page) {
189                 tmp = page_chain_tail(page, NULL);
190                 spin_lock(&drbd_pp_lock);
191                 page_chain_add(&drbd_pp_pool, page, tmp);
192                 drbd_pp_vacant += i;
193                 spin_unlock(&drbd_pp_lock);
194         }
195         return NULL;
196 }
197
198 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
199                                            struct list_head *to_be_freed)
200 {
201         struct drbd_peer_request *peer_req;
202         struct list_head *le, *tle;
203
204         /* The EEs are always appended to the end of the list. Since
205            they are sent in order over the wire, they have to finish
206            in order. As soon as we see the first not finished we can
207            stop to examine the list... */
208
209         list_for_each_safe(le, tle, &mdev->net_ee) {
210                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
211                 if (drbd_peer_req_has_active_page(peer_req))
212                         break;
213                 list_move(le, to_be_freed);
214         }
215 }
216
217 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
218 {
219         LIST_HEAD(reclaimed);
220         struct drbd_peer_request *peer_req, *t;
221
222         spin_lock_irq(&mdev->tconn->req_lock);
223         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
224         spin_unlock_irq(&mdev->tconn->req_lock);
225
226         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
227                 drbd_free_net_peer_req(mdev, peer_req);
228 }
229
230 /**
231  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
232  * @mdev:       DRBD device.
233  * @number:     number of pages requested
234  * @retry:      whether to retry, if not enough pages are available right now
235  *
236  * Tries to allocate number pages, first from our own page pool, then from
237  * the kernel, unless this allocation would exceed the max_buffers setting.
238  * Possibly retry until DRBD frees sufficient pages somewhere else.
239  *
240  * Returns a page chain linked via page->private.
241  */
242 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
243 {
244         struct page *page = NULL;
245         DEFINE_WAIT(wait);
246
247         /* Yes, we may run up to @number over max_buffers. If we
248          * follow it strictly, the admin will get it wrong anyways. */
249         if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
250                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
251
252         while (page == NULL) {
253                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
254
255                 drbd_kick_lo_and_reclaim_net(mdev);
256
257                 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
258                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
259                         if (page)
260                                 break;
261                 }
262
263                 if (!retry)
264                         break;
265
266                 if (signal_pending(current)) {
267                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
268                         break;
269                 }
270
271                 schedule();
272         }
273         finish_wait(&drbd_pp_wait, &wait);
274
275         if (page)
276                 atomic_add(number, &mdev->pp_in_use);
277         return page;
278 }
279
280 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
281  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
282  * Either links the page chain back to the global pool,
283  * or returns all pages to the system. */
284 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
285 {
286         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
287         int i;
288
289         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
290                 i = page_chain_free(page);
291         else {
292                 struct page *tmp;
293                 tmp = page_chain_tail(page, &i);
294                 spin_lock(&drbd_pp_lock);
295                 page_chain_add(&drbd_pp_pool, page, tmp);
296                 drbd_pp_vacant += i;
297                 spin_unlock(&drbd_pp_lock);
298         }
299         i = atomic_sub_return(i, a);
300         if (i < 0)
301                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
302                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
303         wake_up(&drbd_pp_wait);
304 }
305
306 /*
307 You need to hold the req_lock:
308  _drbd_wait_ee_list_empty()
309
310 You must not have the req_lock:
311  drbd_free_peer_req()
312  drbd_alloc_peer_req()
313  drbd_free_peer_reqs()
314  drbd_ee_fix_bhs()
315  drbd_finish_peer_reqs()
316  drbd_clear_done_ee()
317  drbd_wait_ee_list_empty()
318 */
319
320 struct drbd_peer_request *
321 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
322                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
323 {
324         struct drbd_peer_request *peer_req;
325         struct page *page;
326         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
327
328         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
329                 return NULL;
330
331         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
332         if (!peer_req) {
333                 if (!(gfp_mask & __GFP_NOWARN))
334                         dev_err(DEV, "%s: allocation failed\n", __func__);
335                 return NULL;
336         }
337
338         page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
339         if (!page)
340                 goto fail;
341
342         drbd_clear_interval(&peer_req->i);
343         peer_req->i.size = data_size;
344         peer_req->i.sector = sector;
345         peer_req->i.local = false;
346         peer_req->i.waiting = false;
347
348         peer_req->epoch = NULL;
349         peer_req->w.mdev = mdev;
350         peer_req->pages = page;
351         atomic_set(&peer_req->pending_bios, 0);
352         peer_req->flags = 0;
353         /*
354          * The block_id is opaque to the receiver.  It is not endianness
355          * converted, and sent back to the sender unchanged.
356          */
357         peer_req->block_id = id;
358
359         return peer_req;
360
361  fail:
362         mempool_free(peer_req, drbd_ee_mempool);
363         return NULL;
364 }
365
366 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
367                        int is_net)
368 {
369         if (peer_req->flags & EE_HAS_DIGEST)
370                 kfree(peer_req->digest);
371         drbd_pp_free(mdev, peer_req->pages, is_net);
372         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
373         D_ASSERT(drbd_interval_empty(&peer_req->i));
374         mempool_free(peer_req, drbd_ee_mempool);
375 }
376
377 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
378 {
379         LIST_HEAD(work_list);
380         struct drbd_peer_request *peer_req, *t;
381         int count = 0;
382         int is_net = list == &mdev->net_ee;
383
384         spin_lock_irq(&mdev->tconn->req_lock);
385         list_splice_init(list, &work_list);
386         spin_unlock_irq(&mdev->tconn->req_lock);
387
388         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
389                 __drbd_free_peer_req(mdev, peer_req, is_net);
390                 count++;
391         }
392         return count;
393 }
394
395 /*
396  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
397  */
398 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
399 {
400         LIST_HEAD(work_list);
401         LIST_HEAD(reclaimed);
402         struct drbd_peer_request *peer_req, *t;
403         int err = 0;
404
405         spin_lock_irq(&mdev->tconn->req_lock);
406         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
407         list_splice_init(&mdev->done_ee, &work_list);
408         spin_unlock_irq(&mdev->tconn->req_lock);
409
410         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
411                 drbd_free_net_peer_req(mdev, peer_req);
412
413         /* possible callbacks here:
414          * e_end_block, and e_end_resync_block, e_send_discard_write.
415          * all ignore the last argument.
416          */
417         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
418                 int err2;
419
420                 /* list_del not necessary, next/prev members not touched */
421                 err2 = peer_req->w.cb(&peer_req->w, !!err);
422                 if (!err)
423                         err = err2;
424                 drbd_free_peer_req(mdev, peer_req);
425         }
426         wake_up(&mdev->ee_wait);
427
428         return err;
429 }
430
431 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
432                                      struct list_head *head)
433 {
434         DEFINE_WAIT(wait);
435
436         /* avoids spin_lock/unlock
437          * and calling prepare_to_wait in the fast path */
438         while (!list_empty(head)) {
439                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
440                 spin_unlock_irq(&mdev->tconn->req_lock);
441                 io_schedule();
442                 finish_wait(&mdev->ee_wait, &wait);
443                 spin_lock_irq(&mdev->tconn->req_lock);
444         }
445 }
446
447 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
448                                     struct list_head *head)
449 {
450         spin_lock_irq(&mdev->tconn->req_lock);
451         _drbd_wait_ee_list_empty(mdev, head);
452         spin_unlock_irq(&mdev->tconn->req_lock);
453 }
454
455 /* see also kernel_accept; which is only present since 2.6.18.
456  * also we want to log which part of it failed, exactly */
457 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
458 {
459         struct sock *sk = sock->sk;
460         int err = 0;
461
462         *what = "listen";
463         err = sock->ops->listen(sock, 5);
464         if (err < 0)
465                 goto out;
466
467         *what = "sock_create_lite";
468         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
469                                newsock);
470         if (err < 0)
471                 goto out;
472
473         *what = "accept";
474         err = sock->ops->accept(sock, *newsock, 0);
475         if (err < 0) {
476                 sock_release(*newsock);
477                 *newsock = NULL;
478                 goto out;
479         }
480         (*newsock)->ops  = sock->ops;
481
482 out:
483         return err;
484 }
485
486 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
487 {
488         mm_segment_t oldfs;
489         struct kvec iov = {
490                 .iov_base = buf,
491                 .iov_len = size,
492         };
493         struct msghdr msg = {
494                 .msg_iovlen = 1,
495                 .msg_iov = (struct iovec *)&iov,
496                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
497         };
498         int rv;
499
500         oldfs = get_fs();
501         set_fs(KERNEL_DS);
502         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
503         set_fs(oldfs);
504
505         return rv;
506 }
507
508 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
509 {
510         mm_segment_t oldfs;
511         struct kvec iov = {
512                 .iov_base = buf,
513                 .iov_len = size,
514         };
515         struct msghdr msg = {
516                 .msg_iovlen = 1,
517                 .msg_iov = (struct iovec *)&iov,
518                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
519         };
520         int rv;
521
522         oldfs = get_fs();
523         set_fs(KERNEL_DS);
524
525         for (;;) {
526                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
527                 if (rv == size)
528                         break;
529
530                 /* Note:
531                  * ECONNRESET   other side closed the connection
532                  * ERESTARTSYS  (on  sock) we got a signal
533                  */
534
535                 if (rv < 0) {
536                         if (rv == -ECONNRESET)
537                                 conn_info(tconn, "sock was reset by peer\n");
538                         else if (rv != -ERESTARTSYS)
539                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
540                         break;
541                 } else if (rv == 0) {
542                         conn_info(tconn, "sock was shut down by peer\n");
543                         break;
544                 } else  {
545                         /* signal came in, or peer/link went down,
546                          * after we read a partial message
547                          */
548                         /* D_ASSERT(signal_pending(current)); */
549                         break;
550                 }
551         };
552
553         set_fs(oldfs);
554
555         if (rv != size)
556                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
557
558         return rv;
559 }
560
561 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
562 {
563         int err;
564
565         err = drbd_recv(tconn, buf, size);
566         if (err != size) {
567                 if (err >= 0)
568                         err = -EIO;
569         } else
570                 err = 0;
571         return err;
572 }
573
574 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
575 {
576         int err;
577
578         err = drbd_recv_all(tconn, buf, size);
579         if (err && !signal_pending(current))
580                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
581         return err;
582 }
583
584 /* quoting tcp(7):
585  *   On individual connections, the socket buffer size must be set prior to the
586  *   listen(2) or connect(2) calls in order to have it take effect.
587  * This is our wrapper to do so.
588  */
589 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
590                 unsigned int rcv)
591 {
592         /* open coded SO_SNDBUF, SO_RCVBUF */
593         if (snd) {
594                 sock->sk->sk_sndbuf = snd;
595                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
596         }
597         if (rcv) {
598                 sock->sk->sk_rcvbuf = rcv;
599                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
600         }
601 }
602
603 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
604 {
605         const char *what;
606         struct socket *sock;
607         struct sockaddr_in6 src_in6;
608         int err;
609         int disconnect_on_error = 1;
610
611         if (!get_net_conf(tconn))
612                 return NULL;
613
614         what = "sock_create_kern";
615         err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
616                 SOCK_STREAM, IPPROTO_TCP, &sock);
617         if (err < 0) {
618                 sock = NULL;
619                 goto out;
620         }
621
622         sock->sk->sk_rcvtimeo =
623         sock->sk->sk_sndtimeo =  tconn->net_conf->try_connect_int*HZ;
624         drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
625                         tconn->net_conf->rcvbuf_size);
626
627        /* explicitly bind to the configured IP as source IP
628         *  for the outgoing connections.
629         *  This is needed for multihomed hosts and to be
630         *  able to use lo: interfaces for drbd.
631         * Make sure to use 0 as port number, so linux selects
632         *  a free one dynamically.
633         */
634         memcpy(&src_in6, tconn->net_conf->my_addr,
635                min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
636         if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
637                 src_in6.sin6_port = 0;
638         else
639                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
640
641         what = "bind before connect";
642         err = sock->ops->bind(sock,
643                               (struct sockaddr *) &src_in6,
644                               tconn->net_conf->my_addr_len);
645         if (err < 0)
646                 goto out;
647
648         /* connect may fail, peer not yet available.
649          * stay C_WF_CONNECTION, don't go Disconnecting! */
650         disconnect_on_error = 0;
651         what = "connect";
652         err = sock->ops->connect(sock,
653                                  (struct sockaddr *)tconn->net_conf->peer_addr,
654                                  tconn->net_conf->peer_addr_len, 0);
655
656 out:
657         if (err < 0) {
658                 if (sock) {
659                         sock_release(sock);
660                         sock = NULL;
661                 }
662                 switch (-err) {
663                         /* timeout, busy, signal pending */
664                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
665                 case EINTR: case ERESTARTSYS:
666                         /* peer not (yet) available, network problem */
667                 case ECONNREFUSED: case ENETUNREACH:
668                 case EHOSTDOWN:    case EHOSTUNREACH:
669                         disconnect_on_error = 0;
670                         break;
671                 default:
672                         conn_err(tconn, "%s failed, err = %d\n", what, err);
673                 }
674                 if (disconnect_on_error)
675                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
676         }
677         put_net_conf(tconn);
678         return sock;
679 }
680
681 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
682 {
683         int timeo, err;
684         struct socket *s_estab = NULL, *s_listen;
685         const char *what;
686
687         if (!get_net_conf(tconn))
688                 return NULL;
689
690         what = "sock_create_kern";
691         err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
692                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
693         if (err) {
694                 s_listen = NULL;
695                 goto out;
696         }
697
698         timeo = tconn->net_conf->try_connect_int * HZ;
699         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
700
701         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
702         s_listen->sk->sk_rcvtimeo = timeo;
703         s_listen->sk->sk_sndtimeo = timeo;
704         drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
705                         tconn->net_conf->rcvbuf_size);
706
707         what = "bind before listen";
708         err = s_listen->ops->bind(s_listen,
709                               (struct sockaddr *) tconn->net_conf->my_addr,
710                               tconn->net_conf->my_addr_len);
711         if (err < 0)
712                 goto out;
713
714         err = drbd_accept(&what, s_listen, &s_estab);
715
716 out:
717         if (s_listen)
718                 sock_release(s_listen);
719         if (err < 0) {
720                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
721                         conn_err(tconn, "%s failed, err = %d\n", what, err);
722                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
723                 }
724         }
725         put_net_conf(tconn);
726
727         return s_estab;
728 }
729
730 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
731
732 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
733                              enum drbd_packet cmd)
734 {
735         if (!conn_prepare_command(tconn, sock))
736                 return -EIO;
737         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
738 }
739
740 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
741 {
742         unsigned int header_size = drbd_header_size(tconn);
743         struct packet_info pi;
744         int err;
745
746         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
747         if (err != header_size) {
748                 if (err >= 0)
749                         err = -EIO;
750                 return err;
751         }
752         err = decode_header(tconn, tconn->data.rbuf, &pi);
753         if (err)
754                 return err;
755         return pi.cmd;
756 }
757
758 /**
759  * drbd_socket_okay() - Free the socket if its connection is not okay
760  * @sock:       pointer to the pointer to the socket.
761  */
762 static int drbd_socket_okay(struct socket **sock)
763 {
764         int rr;
765         char tb[4];
766
767         if (!*sock)
768                 return false;
769
770         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
771
772         if (rr > 0 || rr == -EAGAIN) {
773                 return true;
774         } else {
775                 sock_release(*sock);
776                 *sock = NULL;
777                 return false;
778         }
779 }
780 /* Gets called if a connection is established, or if a new minor gets created
781    in a connection */
782 int drbd_connected(int vnr, void *p, void *data)
783 {
784         struct drbd_conf *mdev = (struct drbd_conf *)p;
785         int err;
786
787         atomic_set(&mdev->packet_seq, 0);
788         mdev->peer_seq = 0;
789
790         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
791                 &mdev->tconn->cstate_mutex :
792                 &mdev->own_state_mutex;
793
794         err = drbd_send_sync_param(mdev);
795         if (!err)
796                 err = drbd_send_sizes(mdev, 0, 0);
797         if (!err)
798                 err = drbd_send_uuids(mdev);
799         if (!err)
800                 err = drbd_send_state(mdev);
801         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
802         clear_bit(RESIZE_PENDING, &mdev->flags);
803         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
804         return err;
805 }
806
807 /*
808  * return values:
809  *   1 yes, we have a valid connection
810  *   0 oops, did not work out, please try again
811  *  -1 peer talks different language,
812  *     no point in trying again, please go standalone.
813  *  -2 We do not have a network config...
814  */
815 static int drbd_connect(struct drbd_tconn *tconn)
816 {
817         struct socket *sock, *msock;
818         int try, h, ok;
819
820         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
821                 return -2;
822
823         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
824
825         /* Assume that the peer only understands protocol 80 until we know better.  */
826         tconn->agreed_pro_version = 80;
827
828         do {
829                 struct socket *s;
830
831                 for (try = 0;;) {
832                         /* 3 tries, this should take less than a second! */
833                         s = drbd_try_connect(tconn);
834                         if (s || ++try >= 3)
835                                 break;
836                         /* give the other side time to call bind() & listen() */
837                         schedule_timeout_interruptible(HZ / 10);
838                 }
839
840                 if (s) {
841                         if (!tconn->data.socket) {
842                                 tconn->data.socket = s;
843                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
844                         } else if (!tconn->meta.socket) {
845                                 tconn->meta.socket = s;
846                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
847                         } else {
848                                 conn_err(tconn, "Logic error in drbd_connect()\n");
849                                 goto out_release_sockets;
850                         }
851                 }
852
853                 if (tconn->data.socket && tconn->meta.socket) {
854                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
855                         ok = drbd_socket_okay(&tconn->data.socket);
856                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
857                         if (ok)
858                                 break;
859                 }
860
861 retry:
862                 s = drbd_wait_for_connect(tconn);
863                 if (s) {
864                         try = receive_first_packet(tconn, s);
865                         drbd_socket_okay(&tconn->data.socket);
866                         drbd_socket_okay(&tconn->meta.socket);
867                         switch (try) {
868                         case P_INITIAL_DATA:
869                                 if (tconn->data.socket) {
870                                         conn_warn(tconn, "initial packet S crossed\n");
871                                         sock_release(tconn->data.socket);
872                                 }
873                                 tconn->data.socket = s;
874                                 break;
875                         case P_INITIAL_META:
876                                 if (tconn->meta.socket) {
877                                         conn_warn(tconn, "initial packet M crossed\n");
878                                         sock_release(tconn->meta.socket);
879                                 }
880                                 tconn->meta.socket = s;
881                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
882                                 break;
883                         default:
884                                 conn_warn(tconn, "Error receiving initial packet\n");
885                                 sock_release(s);
886                                 if (random32() & 1)
887                                         goto retry;
888                         }
889                 }
890
891                 if (tconn->cstate <= C_DISCONNECTING)
892                         goto out_release_sockets;
893                 if (signal_pending(current)) {
894                         flush_signals(current);
895                         smp_rmb();
896                         if (get_t_state(&tconn->receiver) == EXITING)
897                                 goto out_release_sockets;
898                 }
899
900                 if (tconn->data.socket && &tconn->meta.socket) {
901                         ok = drbd_socket_okay(&tconn->data.socket);
902                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
903                         if (ok)
904                                 break;
905                 }
906         } while (1);
907
908         sock  = tconn->data.socket;
909         msock = tconn->meta.socket;
910
911         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
912         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
913
914         sock->sk->sk_allocation = GFP_NOIO;
915         msock->sk->sk_allocation = GFP_NOIO;
916
917         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
918         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
919
920         /* NOT YET ...
921          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
922          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
923          * first set it to the P_CONNECTION_FEATURES timeout,
924          * which we set to 4x the configured ping_timeout. */
925         sock->sk->sk_sndtimeo =
926         sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
927
928         msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
929         msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
930
931         /* we don't want delays.
932          * we use TCP_CORK where appropriate, though */
933         drbd_tcp_nodelay(sock);
934         drbd_tcp_nodelay(msock);
935
936         tconn->last_received = jiffies;
937
938         h = drbd_do_features(tconn);
939         if (h <= 0)
940                 return h;
941
942         if (tconn->cram_hmac_tfm) {
943                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
944                 switch (drbd_do_auth(tconn)) {
945                 case -1:
946                         conn_err(tconn, "Authentication of peer failed\n");
947                         return -1;
948                 case 0:
949                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
950                         return 0;
951                 }
952         }
953
954         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
955                 return 0;
956
957         sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
958         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
959
960         drbd_thread_start(&tconn->asender);
961
962         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
963                 return -1;
964
965         return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
966
967 out_release_sockets:
968         if (tconn->data.socket) {
969                 sock_release(tconn->data.socket);
970                 tconn->data.socket = NULL;
971         }
972         if (tconn->meta.socket) {
973                 sock_release(tconn->meta.socket);
974                 tconn->meta.socket = NULL;
975         }
976         return -1;
977 }
978
979 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
980 {
981         unsigned int header_size = drbd_header_size(tconn);
982
983         if (header_size == sizeof(struct p_header100) &&
984             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
985                 struct p_header100 *h = header;
986                 if (h->pad != 0) {
987                         conn_err(tconn, "Header padding is not zero\n");
988                         return -EINVAL;
989                 }
990                 pi->vnr = be16_to_cpu(h->volume);
991                 pi->cmd = be16_to_cpu(h->command);
992                 pi->size = be32_to_cpu(h->length);
993         } else if (header_size == sizeof(struct p_header95) &&
994                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
995                 struct p_header95 *h = header;
996                 pi->cmd = be16_to_cpu(h->command);
997                 pi->size = be32_to_cpu(h->length);
998                 pi->vnr = 0;
999         } else if (header_size == sizeof(struct p_header80) &&
1000                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1001                 struct p_header80 *h = header;
1002                 pi->cmd = be16_to_cpu(h->command);
1003                 pi->size = be16_to_cpu(h->length);
1004                 pi->vnr = 0;
1005         } else {
1006                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1007                          be32_to_cpu(*(__be32 *)header),
1008                          tconn->agreed_pro_version);
1009                 return -EINVAL;
1010         }
1011         pi->data = header + header_size;
1012         return 0;
1013 }
1014
1015 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1016 {
1017         void *buffer = tconn->data.rbuf;
1018         int err;
1019
1020         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1021         if (err)
1022                 return err;
1023
1024         err = decode_header(tconn, buffer, pi);
1025         tconn->last_received = jiffies;
1026
1027         return err;
1028 }
1029
1030 static void drbd_flush(struct drbd_conf *mdev)
1031 {
1032         int rv;
1033
1034         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1035                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1036                                         NULL);
1037                 if (rv) {
1038                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
1039                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1040                          * don't try again for ANY return value != 0
1041                          * if (rv == -EOPNOTSUPP) */
1042                         drbd_bump_write_ordering(mdev, WO_drain_io);
1043                 }
1044                 put_ldev(mdev);
1045         }
1046 }
1047
1048 /**
1049  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1050  * @mdev:       DRBD device.
1051  * @epoch:      Epoch object.
1052  * @ev:         Epoch event.
1053  */
1054 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1055                                                struct drbd_epoch *epoch,
1056                                                enum epoch_event ev)
1057 {
1058         int epoch_size;
1059         struct drbd_epoch *next_epoch;
1060         enum finish_epoch rv = FE_STILL_LIVE;
1061
1062         spin_lock(&mdev->epoch_lock);
1063         do {
1064                 next_epoch = NULL;
1065
1066                 epoch_size = atomic_read(&epoch->epoch_size);
1067
1068                 switch (ev & ~EV_CLEANUP) {
1069                 case EV_PUT:
1070                         atomic_dec(&epoch->active);
1071                         break;
1072                 case EV_GOT_BARRIER_NR:
1073                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1074                         break;
1075                 case EV_BECAME_LAST:
1076                         /* nothing to do*/
1077                         break;
1078                 }
1079
1080                 if (epoch_size != 0 &&
1081                     atomic_read(&epoch->active) == 0 &&
1082                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1083                         if (!(ev & EV_CLEANUP)) {
1084                                 spin_unlock(&mdev->epoch_lock);
1085                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1086                                 spin_lock(&mdev->epoch_lock);
1087                         }
1088                         dec_unacked(mdev);
1089
1090                         if (mdev->current_epoch != epoch) {
1091                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1092                                 list_del(&epoch->list);
1093                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1094                                 mdev->epochs--;
1095                                 kfree(epoch);
1096
1097                                 if (rv == FE_STILL_LIVE)
1098                                         rv = FE_DESTROYED;
1099                         } else {
1100                                 epoch->flags = 0;
1101                                 atomic_set(&epoch->epoch_size, 0);
1102                                 /* atomic_set(&epoch->active, 0); is already zero */
1103                                 if (rv == FE_STILL_LIVE)
1104                                         rv = FE_RECYCLED;
1105                                 wake_up(&mdev->ee_wait);
1106                         }
1107                 }
1108
1109                 if (!next_epoch)
1110                         break;
1111
1112                 epoch = next_epoch;
1113         } while (1);
1114
1115         spin_unlock(&mdev->epoch_lock);
1116
1117         return rv;
1118 }
1119
1120 /**
1121  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1122  * @mdev:       DRBD device.
1123  * @wo:         Write ordering method to try.
1124  */
1125 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1126 {
1127         enum write_ordering_e pwo;
1128         static char *write_ordering_str[] = {
1129                 [WO_none] = "none",
1130                 [WO_drain_io] = "drain",
1131                 [WO_bdev_flush] = "flush",
1132         };
1133
1134         pwo = mdev->write_ordering;
1135         wo = min(pwo, wo);
1136         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1137                 wo = WO_drain_io;
1138         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1139                 wo = WO_none;
1140         mdev->write_ordering = wo;
1141         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1142                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1143 }
1144
1145 /**
1146  * drbd_submit_peer_request()
1147  * @mdev:       DRBD device.
1148  * @peer_req:   peer request
1149  * @rw:         flag field, see bio->bi_rw
1150  *
1151  * May spread the pages to multiple bios,
1152  * depending on bio_add_page restrictions.
1153  *
1154  * Returns 0 if all bios have been submitted,
1155  * -ENOMEM if we could not allocate enough bios,
1156  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1157  *  single page to an empty bio (which should never happen and likely indicates
1158  *  that the lower level IO stack is in some way broken). This has been observed
1159  *  on certain Xen deployments.
1160  */
1161 /* TODO allocate from our own bio_set. */
1162 int drbd_submit_peer_request(struct drbd_conf *mdev,
1163                              struct drbd_peer_request *peer_req,
1164                              const unsigned rw, const int fault_type)
1165 {
1166         struct bio *bios = NULL;
1167         struct bio *bio;
1168         struct page *page = peer_req->pages;
1169         sector_t sector = peer_req->i.sector;
1170         unsigned ds = peer_req->i.size;
1171         unsigned n_bios = 0;
1172         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1173         int err = -ENOMEM;
1174
1175         /* In most cases, we will only need one bio.  But in case the lower
1176          * level restrictions happen to be different at this offset on this
1177          * side than those of the sending peer, we may need to submit the
1178          * request in more than one bio.
1179          *
1180          * Plain bio_alloc is good enough here, this is no DRBD internally
1181          * generated bio, but a bio allocated on behalf of the peer.
1182          */
1183 next_bio:
1184         bio = bio_alloc(GFP_NOIO, nr_pages);
1185         if (!bio) {
1186                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1187                 goto fail;
1188         }
1189         /* > peer_req->i.sector, unless this is the first bio */
1190         bio->bi_sector = sector;
1191         bio->bi_bdev = mdev->ldev->backing_bdev;
1192         bio->bi_rw = rw;
1193         bio->bi_private = peer_req;
1194         bio->bi_end_io = drbd_peer_request_endio;
1195
1196         bio->bi_next = bios;
1197         bios = bio;
1198         ++n_bios;
1199
1200         page_chain_for_each(page) {
1201                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1202                 if (!bio_add_page(bio, page, len, 0)) {
1203                         /* A single page must always be possible!
1204                          * But in case it fails anyways,
1205                          * we deal with it, and complain (below). */
1206                         if (bio->bi_vcnt == 0) {
1207                                 dev_err(DEV,
1208                                         "bio_add_page failed for len=%u, "
1209                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1210                                         len, (unsigned long long)bio->bi_sector);
1211                                 err = -ENOSPC;
1212                                 goto fail;
1213                         }
1214                         goto next_bio;
1215                 }
1216                 ds -= len;
1217                 sector += len >> 9;
1218                 --nr_pages;
1219         }
1220         D_ASSERT(page == NULL);
1221         D_ASSERT(ds == 0);
1222
1223         atomic_set(&peer_req->pending_bios, n_bios);
1224         do {
1225                 bio = bios;
1226                 bios = bios->bi_next;
1227                 bio->bi_next = NULL;
1228
1229                 drbd_generic_make_request(mdev, fault_type, bio);
1230         } while (bios);
1231         return 0;
1232
1233 fail:
1234         while (bios) {
1235                 bio = bios;
1236                 bios = bios->bi_next;
1237                 bio_put(bio);
1238         }
1239         return err;
1240 }
1241
1242 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1243                                              struct drbd_peer_request *peer_req)
1244 {
1245         struct drbd_interval *i = &peer_req->i;
1246
1247         drbd_remove_interval(&mdev->write_requests, i);
1248         drbd_clear_interval(i);
1249
1250         /* Wake up any processes waiting for this peer request to complete.  */
1251         if (i->waiting)
1252                 wake_up(&mdev->misc_wait);
1253 }
1254
1255 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1256 {
1257         struct drbd_conf *mdev;
1258         int rv;
1259         struct p_barrier *p = pi->data;
1260         struct drbd_epoch *epoch;
1261
1262         mdev = vnr_to_mdev(tconn, pi->vnr);
1263         if (!mdev)
1264                 return -EIO;
1265
1266         inc_unacked(mdev);
1267
1268         mdev->current_epoch->barrier_nr = p->barrier;
1269         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1270
1271         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1272          * the activity log, which means it would not be resynced in case the
1273          * R_PRIMARY crashes now.
1274          * Therefore we must send the barrier_ack after the barrier request was
1275          * completed. */
1276         switch (mdev->write_ordering) {
1277         case WO_none:
1278                 if (rv == FE_RECYCLED)
1279                         return 0;
1280
1281                 /* receiver context, in the writeout path of the other node.
1282                  * avoid potential distributed deadlock */
1283                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1284                 if (epoch)
1285                         break;
1286                 else
1287                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1288                         /* Fall through */
1289
1290         case WO_bdev_flush:
1291         case WO_drain_io:
1292                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1293                 drbd_flush(mdev);
1294
1295                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1296                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1297                         if (epoch)
1298                                 break;
1299                 }
1300
1301                 epoch = mdev->current_epoch;
1302                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1303
1304                 D_ASSERT(atomic_read(&epoch->active) == 0);
1305                 D_ASSERT(epoch->flags == 0);
1306
1307                 return 0;
1308         default:
1309                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1310                 return -EIO;
1311         }
1312
1313         epoch->flags = 0;
1314         atomic_set(&epoch->epoch_size, 0);
1315         atomic_set(&epoch->active, 0);
1316
1317         spin_lock(&mdev->epoch_lock);
1318         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1319                 list_add(&epoch->list, &mdev->current_epoch->list);
1320                 mdev->current_epoch = epoch;
1321                 mdev->epochs++;
1322         } else {
1323                 /* The current_epoch got recycled while we allocated this one... */
1324                 kfree(epoch);
1325         }
1326         spin_unlock(&mdev->epoch_lock);
1327
1328         return 0;
1329 }
1330
1331 /* used from receive_RSDataReply (recv_resync_read)
1332  * and from receive_Data */
1333 static struct drbd_peer_request *
1334 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1335               int data_size) __must_hold(local)
1336 {
1337         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1338         struct drbd_peer_request *peer_req;
1339         struct page *page;
1340         int dgs, ds, err;
1341         void *dig_in = mdev->tconn->int_dig_in;
1342         void *dig_vv = mdev->tconn->int_dig_vv;
1343         unsigned long *data;
1344
1345         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1346                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1347
1348         if (dgs) {
1349                 /*
1350                  * FIXME: Receive the incoming digest into the receive buffer
1351                  *        here, together with its struct p_data?
1352                  */
1353                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1354                 if (err)
1355                         return NULL;
1356         }
1357
1358         data_size -= dgs;
1359
1360         if (!expect(data_size != 0))
1361                 return NULL;
1362         if (!expect(IS_ALIGNED(data_size, 512)))
1363                 return NULL;
1364         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1365                 return NULL;
1366
1367         /* even though we trust out peer,
1368          * we sometimes have to double check. */
1369         if (sector + (data_size>>9) > capacity) {
1370                 dev_err(DEV, "request from peer beyond end of local disk: "
1371                         "capacity: %llus < sector: %llus + size: %u\n",
1372                         (unsigned long long)capacity,
1373                         (unsigned long long)sector, data_size);
1374                 return NULL;
1375         }
1376
1377         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1378          * "criss-cross" setup, that might cause write-out on some other DRBD,
1379          * which in turn might block on the other node at this very place.  */
1380         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1381         if (!peer_req)
1382                 return NULL;
1383
1384         ds = data_size;
1385         page = peer_req->pages;
1386         page_chain_for_each(page) {
1387                 unsigned len = min_t(int, ds, PAGE_SIZE);
1388                 data = kmap(page);
1389                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1390                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1391                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1392                         data[0] = data[0] ^ (unsigned long)-1;
1393                 }
1394                 kunmap(page);
1395                 if (err) {
1396                         drbd_free_peer_req(mdev, peer_req);
1397                         return NULL;
1398                 }
1399                 ds -= len;
1400         }
1401
1402         if (dgs) {
1403                 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1404                 if (memcmp(dig_in, dig_vv, dgs)) {
1405                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1406                                 (unsigned long long)sector, data_size);
1407                         drbd_free_peer_req(mdev, peer_req);
1408                         return NULL;
1409                 }
1410         }
1411         mdev->recv_cnt += data_size>>9;
1412         return peer_req;
1413 }
1414
1415 /* drbd_drain_block() just takes a data block
1416  * out of the socket input buffer, and discards it.
1417  */
1418 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1419 {
1420         struct page *page;
1421         int err = 0;
1422         void *data;
1423
1424         if (!data_size)
1425                 return 0;
1426
1427         page = drbd_pp_alloc(mdev, 1, 1);
1428
1429         data = kmap(page);
1430         while (data_size) {
1431                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1432
1433                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1434                 if (err)
1435                         break;
1436                 data_size -= len;
1437         }
1438         kunmap(page);
1439         drbd_pp_free(mdev, page, 0);
1440         return err;
1441 }
1442
1443 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1444                            sector_t sector, int data_size)
1445 {
1446         struct bio_vec *bvec;
1447         struct bio *bio;
1448         int dgs, err, i, expect;
1449         void *dig_in = mdev->tconn->int_dig_in;
1450         void *dig_vv = mdev->tconn->int_dig_vv;
1451
1452         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1453                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1454
1455         if (dgs) {
1456                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1457                 if (err)
1458                         return err;
1459         }
1460
1461         data_size -= dgs;
1462
1463         /* optimistically update recv_cnt.  if receiving fails below,
1464          * we disconnect anyways, and counters will be reset. */
1465         mdev->recv_cnt += data_size>>9;
1466
1467         bio = req->master_bio;
1468         D_ASSERT(sector == bio->bi_sector);
1469
1470         bio_for_each_segment(bvec, bio, i) {
1471                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1472                 expect = min_t(int, data_size, bvec->bv_len);
1473                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1474                 kunmap(bvec->bv_page);
1475                 if (err)
1476                         return err;
1477                 data_size -= expect;
1478         }
1479
1480         if (dgs) {
1481                 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1482                 if (memcmp(dig_in, dig_vv, dgs)) {
1483                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1484                         return -EINVAL;
1485                 }
1486         }
1487
1488         D_ASSERT(data_size == 0);
1489         return 0;
1490 }
1491
1492 /*
1493  * e_end_resync_block() is called in asender context via
1494  * drbd_finish_peer_reqs().
1495  */
1496 static int e_end_resync_block(struct drbd_work *w, int unused)
1497 {
1498         struct drbd_peer_request *peer_req =
1499                 container_of(w, struct drbd_peer_request, w);
1500         struct drbd_conf *mdev = w->mdev;
1501         sector_t sector = peer_req->i.sector;
1502         int err;
1503
1504         D_ASSERT(drbd_interval_empty(&peer_req->i));
1505
1506         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1507                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1508                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1509         } else {
1510                 /* Record failure to sync */
1511                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1512
1513                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1514         }
1515         dec_unacked(mdev);
1516
1517         return err;
1518 }
1519
1520 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1521 {
1522         struct drbd_peer_request *peer_req;
1523
1524         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1525         if (!peer_req)
1526                 goto fail;
1527
1528         dec_rs_pending(mdev);
1529
1530         inc_unacked(mdev);
1531         /* corresponding dec_unacked() in e_end_resync_block()
1532          * respective _drbd_clear_done_ee */
1533
1534         peer_req->w.cb = e_end_resync_block;
1535
1536         spin_lock_irq(&mdev->tconn->req_lock);
1537         list_add(&peer_req->w.list, &mdev->sync_ee);
1538         spin_unlock_irq(&mdev->tconn->req_lock);
1539
1540         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1541         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1542                 return 0;
1543
1544         /* don't care for the reason here */
1545         dev_err(DEV, "submit failed, triggering re-connect\n");
1546         spin_lock_irq(&mdev->tconn->req_lock);
1547         list_del(&peer_req->w.list);
1548         spin_unlock_irq(&mdev->tconn->req_lock);
1549
1550         drbd_free_peer_req(mdev, peer_req);
1551 fail:
1552         put_ldev(mdev);
1553         return -EIO;
1554 }
1555
1556 static struct drbd_request *
1557 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1558              sector_t sector, bool missing_ok, const char *func)
1559 {
1560         struct drbd_request *req;
1561
1562         /* Request object according to our peer */
1563         req = (struct drbd_request *)(unsigned long)id;
1564         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1565                 return req;
1566         if (!missing_ok) {
1567                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1568                         (unsigned long)id, (unsigned long long)sector);
1569         }
1570         return NULL;
1571 }
1572
1573 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1574 {
1575         struct drbd_conf *mdev;
1576         struct drbd_request *req;
1577         sector_t sector;
1578         int err;
1579         struct p_data *p = pi->data;
1580
1581         mdev = vnr_to_mdev(tconn, pi->vnr);
1582         if (!mdev)
1583                 return -EIO;
1584
1585         sector = be64_to_cpu(p->sector);
1586
1587         spin_lock_irq(&mdev->tconn->req_lock);
1588         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1589         spin_unlock_irq(&mdev->tconn->req_lock);
1590         if (unlikely(!req))
1591                 return -EIO;
1592
1593         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1594          * special casing it there for the various failure cases.
1595          * still no race with drbd_fail_pending_reads */
1596         err = recv_dless_read(mdev, req, sector, pi->size);
1597         if (!err)
1598                 req_mod(req, DATA_RECEIVED);
1599         /* else: nothing. handled from drbd_disconnect...
1600          * I don't think we may complete this just yet
1601          * in case we are "on-disconnect: freeze" */
1602
1603         return err;
1604 }
1605
1606 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1607 {
1608         struct drbd_conf *mdev;
1609         sector_t sector;
1610         int err;
1611         struct p_data *p = pi->data;
1612
1613         mdev = vnr_to_mdev(tconn, pi->vnr);
1614         if (!mdev)
1615                 return -EIO;
1616
1617         sector = be64_to_cpu(p->sector);
1618         D_ASSERT(p->block_id == ID_SYNCER);
1619
1620         if (get_ldev(mdev)) {
1621                 /* data is submitted to disk within recv_resync_read.
1622                  * corresponding put_ldev done below on error,
1623                  * or in drbd_peer_request_endio. */
1624                 err = recv_resync_read(mdev, sector, pi->size);
1625         } else {
1626                 if (__ratelimit(&drbd_ratelimit_state))
1627                         dev_err(DEV, "Can not write resync data to local disk.\n");
1628
1629                 err = drbd_drain_block(mdev, pi->size);
1630
1631                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1632         }
1633
1634         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1635
1636         return err;
1637 }
1638
1639 static int w_restart_write(struct drbd_work *w, int cancel)
1640 {
1641         struct drbd_request *req = container_of(w, struct drbd_request, w);
1642         struct drbd_conf *mdev = w->mdev;
1643         struct bio *bio;
1644         unsigned long start_time;
1645         unsigned long flags;
1646
1647         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1648         if (!expect(req->rq_state & RQ_POSTPONED)) {
1649                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1650                 return -EIO;
1651         }
1652         bio = req->master_bio;
1653         start_time = req->start_time;
1654         /* Postponed requests will not have their master_bio completed!  */
1655         __req_mod(req, DISCARD_WRITE, NULL);
1656         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1657
1658         while (__drbd_make_request(mdev, bio, start_time))
1659                 /* retry */ ;
1660         return 0;
1661 }
1662
1663 static void restart_conflicting_writes(struct drbd_conf *mdev,
1664                                        sector_t sector, int size)
1665 {
1666         struct drbd_interval *i;
1667         struct drbd_request *req;
1668
1669         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1670                 if (!i->local)
1671                         continue;
1672                 req = container_of(i, struct drbd_request, i);
1673                 if (req->rq_state & RQ_LOCAL_PENDING ||
1674                     !(req->rq_state & RQ_POSTPONED))
1675                         continue;
1676                 if (expect(list_empty(&req->w.list))) {
1677                         req->w.mdev = mdev;
1678                         req->w.cb = w_restart_write;
1679                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1680                 }
1681         }
1682 }
1683
1684 /*
1685  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1686  */
1687 static int e_end_block(struct drbd_work *w, int cancel)
1688 {
1689         struct drbd_peer_request *peer_req =
1690                 container_of(w, struct drbd_peer_request, w);
1691         struct drbd_conf *mdev = w->mdev;
1692         sector_t sector = peer_req->i.sector;
1693         int err = 0, pcmd;
1694
1695         if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
1696                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1697                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1698                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1699                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1700                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1701                         err = drbd_send_ack(mdev, pcmd, peer_req);
1702                         if (pcmd == P_RS_WRITE_ACK)
1703                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1704                 } else {
1705                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1706                         /* we expect it to be marked out of sync anyways...
1707                          * maybe assert this?  */
1708                 }
1709                 dec_unacked(mdev);
1710         }
1711         /* we delete from the conflict detection hash _after_ we sent out the
1712          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1713         if (mdev->tconn->net_conf->two_primaries) {
1714                 spin_lock_irq(&mdev->tconn->req_lock);
1715                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1716                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1717                 if (peer_req->flags & EE_RESTART_REQUESTS)
1718                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1719                 spin_unlock_irq(&mdev->tconn->req_lock);
1720         } else
1721                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1722
1723         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1724
1725         return err;
1726 }
1727
1728 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1729 {
1730         struct drbd_conf *mdev = w->mdev;
1731         struct drbd_peer_request *peer_req =
1732                 container_of(w, struct drbd_peer_request, w);
1733         int err;
1734
1735         err = drbd_send_ack(mdev, ack, peer_req);
1736         dec_unacked(mdev);
1737
1738         return err;
1739 }
1740
1741 static int e_send_discard_write(struct drbd_work *w, int unused)
1742 {
1743         return e_send_ack(w, P_DISCARD_WRITE);
1744 }
1745
1746 static int e_send_retry_write(struct drbd_work *w, int unused)
1747 {
1748         struct drbd_tconn *tconn = w->mdev->tconn;
1749
1750         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1751                              P_RETRY_WRITE : P_DISCARD_WRITE);
1752 }
1753
1754 static bool seq_greater(u32 a, u32 b)
1755 {
1756         /*
1757          * We assume 32-bit wrap-around here.
1758          * For 24-bit wrap-around, we would have to shift:
1759          *  a <<= 8; b <<= 8;
1760          */
1761         return (s32)a - (s32)b > 0;
1762 }
1763
1764 static u32 seq_max(u32 a, u32 b)
1765 {
1766         return seq_greater(a, b) ? a : b;
1767 }
1768
1769 static bool need_peer_seq(struct drbd_conf *mdev)
1770 {
1771         struct drbd_tconn *tconn = mdev->tconn;
1772
1773         /*
1774          * We only need to keep track of the last packet_seq number of our peer
1775          * if we are in dual-primary mode and we have the discard flag set; see
1776          * handle_write_conflicts().
1777          */
1778         return tconn->net_conf->two_primaries &&
1779                test_bit(DISCARD_CONCURRENT, &tconn->flags);
1780 }
1781
1782 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1783 {
1784         unsigned int newest_peer_seq;
1785
1786         if (need_peer_seq(mdev)) {
1787                 spin_lock(&mdev->peer_seq_lock);
1788                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1789                 mdev->peer_seq = newest_peer_seq;
1790                 spin_unlock(&mdev->peer_seq_lock);
1791                 /* wake up only if we actually changed mdev->peer_seq */
1792                 if (peer_seq == newest_peer_seq)
1793                         wake_up(&mdev->seq_wait);
1794         }
1795 }
1796
1797 /* Called from receive_Data.
1798  * Synchronize packets on sock with packets on msock.
1799  *
1800  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1801  * packet traveling on msock, they are still processed in the order they have
1802  * been sent.
1803  *
1804  * Note: we don't care for Ack packets overtaking P_DATA packets.
1805  *
1806  * In case packet_seq is larger than mdev->peer_seq number, there are
1807  * outstanding packets on the msock. We wait for them to arrive.
1808  * In case we are the logically next packet, we update mdev->peer_seq
1809  * ourselves. Correctly handles 32bit wrap around.
1810  *
1811  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1812  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1813  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1814  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1815  *
1816  * returns 0 if we may process the packet,
1817  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1818 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1819 {
1820         DEFINE_WAIT(wait);
1821         long timeout;
1822         int ret;
1823
1824         if (!need_peer_seq(mdev))
1825                 return 0;
1826
1827         spin_lock(&mdev->peer_seq_lock);
1828         for (;;) {
1829                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1830                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1831                         ret = 0;
1832                         break;
1833                 }
1834                 if (signal_pending(current)) {
1835                         ret = -ERESTARTSYS;
1836                         break;
1837                 }
1838                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1839                 spin_unlock(&mdev->peer_seq_lock);
1840                 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1841                 timeout = schedule_timeout(timeout);
1842                 spin_lock(&mdev->peer_seq_lock);
1843                 if (!timeout) {
1844                         ret = -ETIMEDOUT;
1845                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1846                         break;
1847                 }
1848         }
1849         spin_unlock(&mdev->peer_seq_lock);
1850         finish_wait(&mdev->seq_wait, &wait);
1851         return ret;
1852 }
1853
1854 /* see also bio_flags_to_wire()
1855  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1856  * flags and back. We may replicate to other kernel versions. */
1857 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1858 {
1859         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1860                 (dpf & DP_FUA ? REQ_FUA : 0) |
1861                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1862                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1863 }
1864
1865 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1866                                     unsigned int size)
1867 {
1868         struct drbd_interval *i;
1869
1870     repeat:
1871         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1872                 struct drbd_request *req;
1873                 struct bio_and_error m;
1874
1875                 if (!i->local)
1876                         continue;
1877                 req = container_of(i, struct drbd_request, i);
1878                 if (!(req->rq_state & RQ_POSTPONED))
1879                         continue;
1880                 req->rq_state &= ~RQ_POSTPONED;
1881                 __req_mod(req, NEG_ACKED, &m);
1882                 spin_unlock_irq(&mdev->tconn->req_lock);
1883                 if (m.bio)
1884                         complete_master_bio(mdev, &m);
1885                 spin_lock_irq(&mdev->tconn->req_lock);
1886                 goto repeat;
1887         }
1888 }
1889
1890 static int handle_write_conflicts(struct drbd_conf *mdev,
1891                                   struct drbd_peer_request *peer_req)
1892 {
1893         struct drbd_tconn *tconn = mdev->tconn;
1894         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1895         sector_t sector = peer_req->i.sector;
1896         const unsigned int size = peer_req->i.size;
1897         struct drbd_interval *i;
1898         bool equal;
1899         int err;
1900
1901         /*
1902          * Inserting the peer request into the write_requests tree will prevent
1903          * new conflicting local requests from being added.
1904          */
1905         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1906
1907     repeat:
1908         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1909                 if (i == &peer_req->i)
1910                         continue;
1911
1912                 if (!i->local) {
1913                         /*
1914                          * Our peer has sent a conflicting remote request; this
1915                          * should not happen in a two-node setup.  Wait for the
1916                          * earlier peer request to complete.
1917                          */
1918                         err = drbd_wait_misc(mdev, i);
1919                         if (err)
1920                                 goto out;
1921                         goto repeat;
1922                 }
1923
1924                 equal = i->sector == sector && i->size == size;
1925                 if (resolve_conflicts) {
1926                         /*
1927                          * If the peer request is fully contained within the
1928                          * overlapping request, it can be discarded; otherwise,
1929                          * it will be retried once all overlapping requests
1930                          * have completed.
1931                          */
1932                         bool discard = i->sector <= sector && i->sector +
1933                                        (i->size >> 9) >= sector + (size >> 9);
1934
1935                         if (!equal)
1936                                 dev_alert(DEV, "Concurrent writes detected: "
1937                                                "local=%llus +%u, remote=%llus +%u, "
1938                                                "assuming %s came first\n",
1939                                           (unsigned long long)i->sector, i->size,
1940                                           (unsigned long long)sector, size,
1941                                           discard ? "local" : "remote");
1942
1943                         inc_unacked(mdev);
1944                         peer_req->w.cb = discard ? e_send_discard_write :
1945                                                    e_send_retry_write;
1946                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
1947                         wake_asender(mdev->tconn);
1948
1949                         err = -ENOENT;
1950                         goto out;
1951                 } else {
1952                         struct drbd_request *req =
1953                                 container_of(i, struct drbd_request, i);
1954
1955                         if (!equal)
1956                                 dev_alert(DEV, "Concurrent writes detected: "
1957                                                "local=%llus +%u, remote=%llus +%u\n",
1958                                           (unsigned long long)i->sector, i->size,
1959                                           (unsigned long long)sector, size);
1960
1961                         if (req->rq_state & RQ_LOCAL_PENDING ||
1962                             !(req->rq_state & RQ_POSTPONED)) {
1963                                 /*
1964                                  * Wait for the node with the discard flag to
1965                                  * decide if this request will be discarded or
1966                                  * retried.  Requests that are discarded will
1967                                  * disappear from the write_requests tree.
1968                                  *
1969                                  * In addition, wait for the conflicting
1970                                  * request to finish locally before submitting
1971                                  * the conflicting peer request.
1972                                  */
1973                                 err = drbd_wait_misc(mdev, &req->i);
1974                                 if (err) {
1975                                         _conn_request_state(mdev->tconn,
1976                                                             NS(conn, C_TIMEOUT),
1977                                                             CS_HARD);
1978                                         fail_postponed_requests(mdev, sector, size);
1979                                         goto out;
1980                                 }
1981                                 goto repeat;
1982                         }
1983                         /*
1984                          * Remember to restart the conflicting requests after
1985                          * the new peer request has completed.
1986                          */
1987                         peer_req->flags |= EE_RESTART_REQUESTS;
1988                 }
1989         }
1990         err = 0;
1991
1992     out:
1993         if (err)
1994                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1995         return err;
1996 }
1997
1998 /* mirrored write */
1999 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2000 {
2001         struct drbd_conf *mdev;
2002         sector_t sector;
2003         struct drbd_peer_request *peer_req;
2004         struct p_data *p = pi->data;
2005         u32 peer_seq = be32_to_cpu(p->seq_num);
2006         int rw = WRITE;
2007         u32 dp_flags;
2008         int err;
2009
2010         mdev = vnr_to_mdev(tconn, pi->vnr);
2011         if (!mdev)
2012                 return -EIO;
2013
2014         if (!get_ldev(mdev)) {
2015                 int err2;
2016
2017                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2018                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2019                 atomic_inc(&mdev->current_epoch->epoch_size);
2020                 err2 = drbd_drain_block(mdev, pi->size);
2021                 if (!err)
2022                         err = err2;
2023                 return err;
2024         }
2025
2026         /*
2027          * Corresponding put_ldev done either below (on various errors), or in
2028          * drbd_peer_request_endio, if we successfully submit the data at the
2029          * end of this function.
2030          */
2031
2032         sector = be64_to_cpu(p->sector);
2033         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2034         if (!peer_req) {
2035                 put_ldev(mdev);
2036                 return -EIO;
2037         }
2038
2039         peer_req->w.cb = e_end_block;
2040
2041         dp_flags = be32_to_cpu(p->dp_flags);
2042         rw |= wire_flags_to_bio(mdev, dp_flags);
2043
2044         if (dp_flags & DP_MAY_SET_IN_SYNC)
2045                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2046
2047         spin_lock(&mdev->epoch_lock);
2048         peer_req->epoch = mdev->current_epoch;
2049         atomic_inc(&peer_req->epoch->epoch_size);
2050         atomic_inc(&peer_req->epoch->active);
2051         spin_unlock(&mdev->epoch_lock);
2052
2053         if (mdev->tconn->net_conf->two_primaries) {
2054                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2055                 if (err)
2056                         goto out_interrupted;
2057                 spin_lock_irq(&mdev->tconn->req_lock);
2058                 err = handle_write_conflicts(mdev, peer_req);
2059                 if (err) {
2060                         spin_unlock_irq(&mdev->tconn->req_lock);
2061                         if (err == -ENOENT) {
2062                                 put_ldev(mdev);
2063                                 return 0;
2064                         }
2065                         goto out_interrupted;
2066                 }
2067         } else
2068                 spin_lock_irq(&mdev->tconn->req_lock);
2069         list_add(&peer_req->w.list, &mdev->active_ee);
2070         spin_unlock_irq(&mdev->tconn->req_lock);
2071
2072         switch (mdev->tconn->net_conf->wire_protocol) {
2073         case DRBD_PROT_C:
2074                 inc_unacked(mdev);
2075                 /* corresponding dec_unacked() in e_end_block()
2076                  * respective _drbd_clear_done_ee */
2077                 break;
2078         case DRBD_PROT_B:
2079                 /* I really don't like it that the receiver thread
2080                  * sends on the msock, but anyways */
2081                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2082                 break;
2083         case DRBD_PROT_A:
2084                 /* nothing to do */
2085                 break;
2086         }
2087
2088         if (mdev->state.pdsk < D_INCONSISTENT) {
2089                 /* In case we have the only disk of the cluster, */
2090                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2091                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2092                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2093                 drbd_al_begin_io(mdev, &peer_req->i);
2094         }
2095
2096         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2097         if (!err)
2098                 return 0;
2099
2100         /* don't care for the reason here */
2101         dev_err(DEV, "submit failed, triggering re-connect\n");
2102         spin_lock_irq(&mdev->tconn->req_lock);
2103         list_del(&peer_req->w.list);
2104         drbd_remove_epoch_entry_interval(mdev, peer_req);
2105         spin_unlock_irq(&mdev->tconn->req_lock);
2106         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2107                 drbd_al_complete_io(mdev, &peer_req->i);
2108
2109 out_interrupted:
2110         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2111         put_ldev(mdev);
2112         drbd_free_peer_req(mdev, peer_req);
2113         return err;
2114 }
2115
2116 /* We may throttle resync, if the lower device seems to be busy,
2117  * and current sync rate is above c_min_rate.
2118  *
2119  * To decide whether or not the lower device is busy, we use a scheme similar
2120  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2121  * (more than 64 sectors) of activity we cannot account for with our own resync
2122  * activity, it obviously is "busy".
2123  *
2124  * The current sync rate used here uses only the most recent two step marks,
2125  * to have a short time average so we can react faster.
2126  */
2127 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2128 {
2129         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2130         unsigned long db, dt, dbdt;
2131         struct lc_element *tmp;
2132         int curr_events;
2133         int throttle = 0;
2134
2135         /* feature disabled? */
2136         if (mdev->ldev->dc.c_min_rate == 0)
2137                 return 0;
2138
2139         spin_lock_irq(&mdev->al_lock);
2140         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2141         if (tmp) {
2142                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2143                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2144                         spin_unlock_irq(&mdev->al_lock);
2145                         return 0;
2146                 }
2147                 /* Do not slow down if app IO is already waiting for this extent */
2148         }
2149         spin_unlock_irq(&mdev->al_lock);
2150
2151         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2152                       (int)part_stat_read(&disk->part0, sectors[1]) -
2153                         atomic_read(&mdev->rs_sect_ev);
2154
2155         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2156                 unsigned long rs_left;
2157                 int i;
2158
2159                 mdev->rs_last_events = curr_events;
2160
2161                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2162                  * approx. */
2163                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2164
2165                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2166                         rs_left = mdev->ov_left;
2167                 else
2168                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2169
2170                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2171                 if (!dt)
2172                         dt++;
2173                 db = mdev->rs_mark_left[i] - rs_left;
2174                 dbdt = Bit2KB(db/dt);
2175
2176                 if (dbdt > mdev->ldev->dc.c_min_rate)
2177                         throttle = 1;
2178         }
2179         return throttle;
2180 }
2181
2182
2183 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2184 {
2185         struct drbd_conf *mdev;
2186         sector_t sector;
2187         sector_t capacity;
2188         struct drbd_peer_request *peer_req;
2189         struct digest_info *di = NULL;
2190         int size, verb;
2191         unsigned int fault_type;
2192         struct p_block_req *p = pi->data;
2193
2194         mdev = vnr_to_mdev(tconn, pi->vnr);
2195         if (!mdev)
2196                 return -EIO;
2197         capacity = drbd_get_capacity(mdev->this_bdev);
2198
2199         sector = be64_to_cpu(p->sector);
2200         size   = be32_to_cpu(p->blksize);
2201
2202         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2203                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2204                                 (unsigned long long)sector, size);
2205                 return -EINVAL;
2206         }
2207         if (sector + (size>>9) > capacity) {
2208                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2209                                 (unsigned long long)sector, size);
2210                 return -EINVAL;
2211         }
2212
2213         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2214                 verb = 1;
2215                 switch (pi->cmd) {
2216                 case P_DATA_REQUEST:
2217                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2218                         break;
2219                 case P_RS_DATA_REQUEST:
2220                 case P_CSUM_RS_REQUEST:
2221                 case P_OV_REQUEST:
2222                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2223                         break;
2224                 case P_OV_REPLY:
2225                         verb = 0;
2226                         dec_rs_pending(mdev);
2227                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2228                         break;
2229                 default:
2230                         BUG();
2231                 }
2232                 if (verb && __ratelimit(&drbd_ratelimit_state))
2233                         dev_err(DEV, "Can not satisfy peer's read request, "
2234                             "no local data.\n");
2235
2236                 /* drain possibly payload */
2237                 return drbd_drain_block(mdev, pi->size);
2238         }
2239
2240         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2241          * "criss-cross" setup, that might cause write-out on some other DRBD,
2242          * which in turn might block on the other node at this very place.  */
2243         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2244         if (!peer_req) {
2245                 put_ldev(mdev);
2246                 return -ENOMEM;
2247         }
2248
2249         switch (pi->cmd) {
2250         case P_DATA_REQUEST:
2251                 peer_req->w.cb = w_e_end_data_req;
2252                 fault_type = DRBD_FAULT_DT_RD;
2253                 /* application IO, don't drbd_rs_begin_io */
2254                 goto submit;
2255
2256         case P_RS_DATA_REQUEST:
2257                 peer_req->w.cb = w_e_end_rsdata_req;
2258                 fault_type = DRBD_FAULT_RS_RD;
2259                 /* used in the sector offset progress display */
2260                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2261                 break;
2262
2263         case P_OV_REPLY:
2264         case P_CSUM_RS_REQUEST:
2265                 fault_type = DRBD_FAULT_RS_RD;
2266                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2267                 if (!di)
2268                         goto out_free_e;
2269
2270                 di->digest_size = pi->size;
2271                 di->digest = (((char *)di)+sizeof(struct digest_info));
2272
2273                 peer_req->digest = di;
2274                 peer_req->flags |= EE_HAS_DIGEST;
2275
2276                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2277                         goto out_free_e;
2278
2279                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2280                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2281                         peer_req->w.cb = w_e_end_csum_rs_req;
2282                         /* used in the sector offset progress display */
2283                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2284                 } else if (pi->cmd == P_OV_REPLY) {
2285                         /* track progress, we may need to throttle */
2286                         atomic_add(size >> 9, &mdev->rs_sect_in);
2287                         peer_req->w.cb = w_e_end_ov_reply;
2288                         dec_rs_pending(mdev);
2289                         /* drbd_rs_begin_io done when we sent this request,
2290                          * but accounting still needs to be done. */
2291                         goto submit_for_resync;
2292                 }
2293                 break;
2294
2295         case P_OV_REQUEST:
2296                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2297                     mdev->tconn->agreed_pro_version >= 90) {
2298                         unsigned long now = jiffies;
2299                         int i;
2300                         mdev->ov_start_sector = sector;
2301                         mdev->ov_position = sector;
2302                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2303                         mdev->rs_total = mdev->ov_left;
2304                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2305                                 mdev->rs_mark_left[i] = mdev->ov_left;
2306                                 mdev->rs_mark_time[i] = now;
2307                         }
2308                         dev_info(DEV, "Online Verify start sector: %llu\n",
2309                                         (unsigned long long)sector);
2310                 }
2311                 peer_req->w.cb = w_e_end_ov_req;
2312                 fault_type = DRBD_FAULT_RS_RD;
2313                 break;
2314
2315         default:
2316                 BUG();
2317         }
2318
2319         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2320          * wrt the receiver, but it is not as straightforward as it may seem.
2321          * Various places in the resync start and stop logic assume resync
2322          * requests are processed in order, requeuing this on the worker thread
2323          * introduces a bunch of new code for synchronization between threads.
2324          *
2325          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2326          * "forever", throttling after drbd_rs_begin_io will lock that extent
2327          * for application writes for the same time.  For now, just throttle
2328          * here, where the rest of the code expects the receiver to sleep for
2329          * a while, anyways.
2330          */
2331
2332         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2333          * this defers syncer requests for some time, before letting at least
2334          * on request through.  The resync controller on the receiving side
2335          * will adapt to the incoming rate accordingly.
2336          *
2337          * We cannot throttle here if remote is Primary/SyncTarget:
2338          * we would also throttle its application reads.
2339          * In that case, throttling is done on the SyncTarget only.
2340          */
2341         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2342                 schedule_timeout_uninterruptible(HZ/10);
2343         if (drbd_rs_begin_io(mdev, sector))
2344                 goto out_free_e;
2345
2346 submit_for_resync:
2347         atomic_add(size >> 9, &mdev->rs_sect_ev);
2348
2349 submit:
2350         inc_unacked(mdev);
2351         spin_lock_irq(&mdev->tconn->req_lock);
2352         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2353         spin_unlock_irq(&mdev->tconn->req_lock);
2354
2355         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2356                 return 0;
2357
2358         /* don't care for the reason here */
2359         dev_err(DEV, "submit failed, triggering re-connect\n");
2360         spin_lock_irq(&mdev->tconn->req_lock);
2361         list_del(&peer_req->w.list);
2362         spin_unlock_irq(&mdev->tconn->req_lock);
2363         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2364
2365 out_free_e:
2366         put_ldev(mdev);
2367         drbd_free_peer_req(mdev, peer_req);
2368         return -EIO;
2369 }
2370
2371 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2372 {
2373         int self, peer, rv = -100;
2374         unsigned long ch_self, ch_peer;
2375
2376         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2377         peer = mdev->p_uuid[UI_BITMAP] & 1;
2378
2379         ch_peer = mdev->p_uuid[UI_SIZE];
2380         ch_self = mdev->comm_bm_set;
2381
2382         switch (mdev->tconn->net_conf->after_sb_0p) {
2383         case ASB_CONSENSUS:
2384         case ASB_DISCARD_SECONDARY:
2385         case ASB_CALL_HELPER:
2386                 dev_err(DEV, "Configuration error.\n");
2387                 break;
2388         case ASB_DISCONNECT:
2389                 break;
2390         case ASB_DISCARD_YOUNGER_PRI:
2391                 if (self == 0 && peer == 1) {
2392                         rv = -1;
2393                         break;
2394                 }
2395                 if (self == 1 && peer == 0) {
2396                         rv =  1;
2397                         break;
2398                 }
2399                 /* Else fall through to one of the other strategies... */
2400         case ASB_DISCARD_OLDER_PRI:
2401                 if (self == 0 && peer == 1) {
2402                         rv = 1;
2403                         break;
2404                 }
2405                 if (self == 1 && peer == 0) {
2406                         rv = -1;
2407                         break;
2408                 }
2409                 /* Else fall through to one of the other strategies... */
2410                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2411                      "Using discard-least-changes instead\n");
2412         case ASB_DISCARD_ZERO_CHG:
2413                 if (ch_peer == 0 && ch_self == 0) {
2414                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2415                                 ? -1 : 1;
2416                         break;
2417                 } else {
2418                         if (ch_peer == 0) { rv =  1; break; }
2419                         if (ch_self == 0) { rv = -1; break; }
2420                 }
2421                 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2422                         break;
2423         case ASB_DISCARD_LEAST_CHG:
2424                 if      (ch_self < ch_peer)
2425                         rv = -1;
2426                 else if (ch_self > ch_peer)
2427                         rv =  1;
2428                 else /* ( ch_self == ch_peer ) */
2429                      /* Well, then use something else. */
2430                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2431                                 ? -1 : 1;
2432                 break;
2433         case ASB_DISCARD_LOCAL:
2434                 rv = -1;
2435                 break;
2436         case ASB_DISCARD_REMOTE:
2437                 rv =  1;
2438         }
2439
2440         return rv;
2441 }
2442
2443 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2444 {
2445         int hg, rv = -100;
2446
2447         switch (mdev->tconn->net_conf->after_sb_1p) {
2448         case ASB_DISCARD_YOUNGER_PRI:
2449         case ASB_DISCARD_OLDER_PRI:
2450         case ASB_DISCARD_LEAST_CHG:
2451         case ASB_DISCARD_LOCAL:
2452         case ASB_DISCARD_REMOTE:
2453                 dev_err(DEV, "Configuration error.\n");
2454                 break;
2455         case ASB_DISCONNECT:
2456                 break;
2457         case ASB_CONSENSUS:
2458                 hg = drbd_asb_recover_0p(mdev);
2459                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2460                         rv = hg;
2461                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2462                         rv = hg;
2463                 break;
2464         case ASB_VIOLENTLY:
2465                 rv = drbd_asb_recover_0p(mdev);
2466                 break;
2467         case ASB_DISCARD_SECONDARY:
2468                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2469         case ASB_CALL_HELPER:
2470                 hg = drbd_asb_recover_0p(mdev);
2471                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2472                         enum drbd_state_rv rv2;
2473
2474                         drbd_set_role(mdev, R_SECONDARY, 0);
2475                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2476                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2477                           * we do not need to wait for the after state change work either. */
2478                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2479                         if (rv2 != SS_SUCCESS) {
2480                                 drbd_khelper(mdev, "pri-lost-after-sb");
2481                         } else {
2482                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2483                                 rv = hg;
2484                         }
2485                 } else
2486                         rv = hg;
2487         }
2488
2489         return rv;
2490 }
2491
2492 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2493 {
2494         int hg, rv = -100;
2495
2496         switch (mdev->tconn->net_conf->after_sb_2p) {
2497         case ASB_DISCARD_YOUNGER_PRI:
2498         case ASB_DISCARD_OLDER_PRI:
2499         case ASB_DISCARD_LEAST_CHG:
2500         case ASB_DISCARD_LOCAL:
2501         case ASB_DISCARD_REMOTE:
2502         case ASB_CONSENSUS:
2503         case ASB_DISCARD_SECONDARY:
2504                 dev_err(DEV, "Configuration error.\n");
2505                 break;
2506         case ASB_VIOLENTLY:
2507                 rv = drbd_asb_recover_0p(mdev);
2508                 break;
2509         case ASB_DISCONNECT:
2510                 break;
2511         case ASB_CALL_HELPER:
2512                 hg = drbd_asb_recover_0p(mdev);
2513                 if (hg == -1) {
2514                         enum drbd_state_rv rv2;
2515
2516                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2517                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2518                           * we do not need to wait for the after state change work either. */
2519                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2520                         if (rv2 != SS_SUCCESS) {
2521                                 drbd_khelper(mdev, "pri-lost-after-sb");
2522                         } else {
2523                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2524                                 rv = hg;
2525                         }
2526                 } else
2527                         rv = hg;
2528         }
2529
2530         return rv;
2531 }
2532
2533 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2534                            u64 bits, u64 flags)
2535 {
2536         if (!uuid) {
2537                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2538                 return;
2539         }
2540         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2541              text,
2542              (unsigned long long)uuid[UI_CURRENT],
2543              (unsigned long long)uuid[UI_BITMAP],
2544              (unsigned long long)uuid[UI_HISTORY_START],
2545              (unsigned long long)uuid[UI_HISTORY_END],
2546              (unsigned long long)bits,
2547              (unsigned long long)flags);
2548 }
2549
2550 /*
2551   100   after split brain try auto recover
2552     2   C_SYNC_SOURCE set BitMap
2553     1   C_SYNC_SOURCE use BitMap
2554     0   no Sync
2555    -1   C_SYNC_TARGET use BitMap
2556    -2   C_SYNC_TARGET set BitMap
2557  -100   after split brain, disconnect
2558 -1000   unrelated data
2559 -1091   requires proto 91
2560 -1096   requires proto 96
2561  */
2562 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2563 {
2564         u64 self, peer;
2565         int i, j;
2566
2567         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2568         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2569
2570         *rule_nr = 10;
2571         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2572                 return 0;
2573
2574         *rule_nr = 20;
2575         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2576              peer != UUID_JUST_CREATED)
2577                 return -2;
2578
2579         *rule_nr = 30;
2580         if (self != UUID_JUST_CREATED &&
2581             (peer == UUID_JUST_CREATED || peer == (u64)0))
2582                 return 2;
2583
2584         if (self == peer) {
2585                 int rct, dc; /* roles at crash time */
2586
2587                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2588
2589                         if (mdev->tconn->agreed_pro_version < 91)
2590                                 return -1091;
2591
2592                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2593                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2594                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2595                                 drbd_uuid_set_bm(mdev, 0UL);
2596
2597                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2598                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2599                                 *rule_nr = 34;
2600                         } else {
2601                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2602                                 *rule_nr = 36;
2603                         }
2604
2605                         return 1;
2606                 }
2607
2608                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2609
2610                         if (mdev->tconn->agreed_pro_version < 91)
2611                                 return -1091;
2612
2613                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2614                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2615                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2616
2617                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2618                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2619                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2620
2621                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2622                                 *rule_nr = 35;
2623                         } else {
2624                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2625                                 *rule_nr = 37;
2626                         }
2627
2628                         return -1;
2629                 }
2630
2631                 /* Common power [off|failure] */
2632                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2633                         (mdev->p_uuid[UI_FLAGS] & 2);
2634                 /* lowest bit is set when we were primary,
2635                  * next bit (weight 2) is set when peer was primary */
2636                 *rule_nr = 40;
2637
2638                 switch (rct) {
2639                 case 0: /* !self_pri && !peer_pri */ return 0;
2640                 case 1: /*  self_pri && !peer_pri */ return 1;
2641                 case 2: /* !self_pri &&  peer_pri */ return -1;
2642                 case 3: /*  self_pri &&  peer_pri */
2643                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2644                         return dc ? -1 : 1;
2645                 }
2646         }
2647
2648         *rule_nr = 50;
2649         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2650         if (self == peer)
2651                 return -1;
2652
2653         *rule_nr = 51;
2654         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2655         if (self == peer) {
2656                 if (mdev->tconn->agreed_pro_version < 96 ?
2657                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2658                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2659                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2660                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2661                            resync as sync source modifications of the peer's UUIDs. */
2662
2663                         if (mdev->tconn->agreed_pro_version < 91)
2664                                 return -1091;
2665
2666                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2667                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2668
2669                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2670                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2671
2672                         return -1;
2673                 }
2674         }
2675
2676         *rule_nr = 60;
2677         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2678         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2679                 peer = mdev->p_uuid[i] & ~((u64)1);
2680                 if (self == peer)
2681                         return -2;
2682         }
2683
2684         *rule_nr = 70;
2685         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2686         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2687         if (self == peer)
2688                 return 1;
2689
2690         *rule_nr = 71;
2691         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2692         if (self == peer) {
2693                 if (mdev->tconn->agreed_pro_version < 96 ?
2694                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2695                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2696                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2697                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2698                            resync as sync source modifications of our UUIDs. */
2699
2700                         if (mdev->tconn->agreed_pro_version < 91)
2701                                 return -1091;
2702
2703                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2704                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2705
2706                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2707                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2708                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2709
2710                         return 1;
2711                 }
2712         }
2713
2714
2715         *rule_nr = 80;
2716         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2717         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2718                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2719                 if (self == peer)
2720                         return 2;
2721         }
2722
2723         *rule_nr = 90;
2724         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2725         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2726         if (self == peer && self != ((u64)0))
2727                 return 100;
2728
2729         *rule_nr = 100;
2730         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2731                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2732                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2733                         peer = mdev->p_uuid[j] & ~((u64)1);
2734                         if (self == peer)
2735                                 return -100;
2736                 }
2737         }
2738
2739         return -1000;
2740 }
2741
2742 /* drbd_sync_handshake() returns the new conn state on success, or
2743    CONN_MASK (-1) on failure.
2744  */
2745 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2746                                            enum drbd_disk_state peer_disk) __must_hold(local)
2747 {
2748         int hg, rule_nr;
2749         enum drbd_conns rv = C_MASK;
2750         enum drbd_disk_state mydisk;
2751
2752         mydisk = mdev->state.disk;
2753         if (mydisk == D_NEGOTIATING)
2754                 mydisk = mdev->new_state_tmp.disk;
2755
2756         dev_info(DEV, "drbd_sync_handshake:\n");
2757         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2758         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2759                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2760
2761         hg = drbd_uuid_compare(mdev, &rule_nr);
2762
2763         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2764
2765         if (hg == -1000) {
2766                 dev_alert(DEV, "Unrelated data, aborting!\n");
2767                 return C_MASK;
2768         }
2769         if (hg < -1000) {
2770                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2771                 return C_MASK;
2772         }
2773
2774         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2775             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2776                 int f = (hg == -100) || abs(hg) == 2;
2777                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2778                 if (f)
2779                         hg = hg*2;
2780                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2781                      hg > 0 ? "source" : "target");
2782         }
2783
2784         if (abs(hg) == 100)
2785                 drbd_khelper(mdev, "initial-split-brain");
2786
2787         if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
2788                 int pcount = (mdev->state.role == R_PRIMARY)
2789                            + (peer_role == R_PRIMARY);
2790                 int forced = (hg == -100);
2791
2792                 switch (pcount) {
2793                 case 0:
2794                         hg = drbd_asb_recover_0p(mdev);
2795                         break;
2796                 case 1:
2797                         hg = drbd_asb_recover_1p(mdev);
2798                         break;
2799                 case 2:
2800                         hg = drbd_asb_recover_2p(mdev);
2801                         break;
2802                 }
2803                 if (abs(hg) < 100) {
2804                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2805                              "automatically solved. Sync from %s node\n",
2806                              pcount, (hg < 0) ? "peer" : "this");
2807                         if (forced) {
2808                                 dev_warn(DEV, "Doing a full sync, since"
2809                                      " UUIDs where ambiguous.\n");
2810                                 hg = hg*2;
2811                         }
2812                 }
2813         }
2814
2815         if (hg == -100) {
2816                 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2817                         hg = -1;
2818                 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2819                         hg = 1;
2820
2821                 if (abs(hg) < 100)
2822                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2823                              "Sync from %s node\n",
2824                              (hg < 0) ? "peer" : "this");
2825         }
2826
2827         if (hg == -100) {
2828                 /* FIXME this log message is not correct if we end up here
2829                  * after an attempted attach on a diskless node.
2830                  * We just refuse to attach -- well, we drop the "connection"
2831                  * to that disk, in a way... */
2832                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2833                 drbd_khelper(mdev, "split-brain");
2834                 return C_MASK;
2835         }
2836
2837         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2838                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2839                 return C_MASK;
2840         }
2841
2842         if (hg < 0 && /* by intention we do not use mydisk here. */
2843             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2844                 switch (mdev->tconn->net_conf->rr_conflict) {
2845                 case ASB_CALL_HELPER:
2846                         drbd_khelper(mdev, "pri-lost");
2847                         /* fall through */
2848                 case ASB_DISCONNECT:
2849                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2850                         return C_MASK;
2851                 case ASB_VIOLENTLY:
2852                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2853                              "assumption\n");
2854                 }
2855         }
2856
2857         if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2858                 if (hg == 0)
2859                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2860                 else
2861                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2862                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2863                                  abs(hg) >= 2 ? "full" : "bit-map based");
2864                 return C_MASK;
2865         }
2866
2867         if (abs(hg) >= 2) {
2868                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2869                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2870                                         BM_LOCKED_SET_ALLOWED))
2871                         return C_MASK;
2872         }
2873
2874         if (hg > 0) { /* become sync source. */
2875                 rv = C_WF_BITMAP_S;
2876         } else if (hg < 0) { /* become sync target */
2877                 rv = C_WF_BITMAP_T;
2878         } else {
2879                 rv = C_CONNECTED;
2880                 if (drbd_bm_total_weight(mdev)) {
2881                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2882                              drbd_bm_total_weight(mdev));
2883                 }
2884         }
2885
2886         return rv;
2887 }
2888
2889 /* returns 1 if invalid */
2890 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2891 {
2892         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2893         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2894             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2895                 return 0;
2896
2897         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2898         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2899             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2900                 return 1;
2901
2902         /* everything else is valid if they are equal on both sides. */
2903         if (peer == self)
2904                 return 0;
2905
2906         /* everything es is invalid. */
2907         return 1;
2908 }
2909
2910 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2911 {
2912         struct p_protocol *p = pi->data;
2913         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2914         int p_want_lose, p_two_primaries, cf;
2915         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2916
2917         p_proto         = be32_to_cpu(p->protocol);
2918         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2919         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2920         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2921         p_two_primaries = be32_to_cpu(p->two_primaries);
2922         cf              = be32_to_cpu(p->conn_flags);
2923         p_want_lose = cf & CF_WANT_LOSE;
2924
2925         clear_bit(CONN_DRY_RUN, &tconn->flags);
2926
2927         if (cf & CF_DRY_RUN)
2928                 set_bit(CONN_DRY_RUN, &tconn->flags);
2929
2930         if (p_proto != tconn->net_conf->wire_protocol) {
2931                 conn_err(tconn, "incompatible communication protocols\n");
2932                 goto disconnect;
2933         }
2934
2935         if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2936                 conn_err(tconn, "incompatible after-sb-0pri settings\n");
2937                 goto disconnect;
2938         }
2939
2940         if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2941                 conn_err(tconn, "incompatible after-sb-1pri settings\n");
2942                 goto disconnect;
2943         }
2944
2945         if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2946                 conn_err(tconn, "incompatible after-sb-2pri settings\n");
2947                 goto disconnect;
2948         }
2949
2950         if (p_want_lose && tconn->net_conf->want_lose) {
2951                 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
2952                 goto disconnect;
2953         }
2954
2955         if (p_two_primaries != tconn->net_conf->two_primaries) {
2956                 conn_err(tconn, "incompatible setting of the two-primaries options\n");
2957                 goto disconnect;
2958         }
2959
2960         if (tconn->agreed_pro_version >= 87) {
2961                 unsigned char *my_alg = tconn->net_conf->integrity_alg;
2962                 int err;
2963
2964                 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
2965                 if (err)
2966                         return err;
2967
2968                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2969                 if (strcmp(p_integrity_alg, my_alg)) {
2970                         conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
2971                         goto disconnect;
2972                 }
2973                 conn_info(tconn, "data-integrity-alg: %s\n",
2974                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2975         }
2976
2977         return 0;
2978
2979 disconnect:
2980         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
2981         return -EIO;
2982 }
2983
2984 /* helper function
2985  * input: alg name, feature name
2986  * return: NULL (alg name was "")
2987  *         ERR_PTR(error) if something goes wrong
2988  *         or the crypto hash ptr, if it worked out ok. */
2989 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2990                 const char *alg, const char *name)
2991 {
2992         struct crypto_hash *tfm;
2993
2994         if (!alg[0])
2995                 return NULL;
2996
2997         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2998         if (IS_ERR(tfm)) {
2999                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3000                         alg, name, PTR_ERR(tfm));
3001                 return tfm;
3002         }
3003         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
3004                 crypto_free_hash(tfm);
3005                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
3006                 return ERR_PTR(-EINVAL);
3007         }
3008         return tfm;
3009 }
3010
3011 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3012 {
3013         void *buffer = tconn->data.rbuf;
3014         int size = pi->size;
3015
3016         while (size) {
3017                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3018                 s = drbd_recv(tconn, buffer, s);
3019                 if (s <= 0) {
3020                         if (s < 0)
3021                                 return s;
3022                         break;
3023                 }
3024                 size -= s;
3025         }
3026         if (size)
3027                 return -EIO;
3028         return 0;
3029 }
3030
3031 /*
3032  * config_unknown_volume  -  device configuration command for unknown volume
3033  *
3034  * When a device is added to an existing connection, the node on which the
3035  * device is added first will send configuration commands to its peer but the
3036  * peer will not know about the device yet.  It will warn and ignore these
3037  * commands.  Once the device is added on the second node, the second node will
3038  * send the same device configuration commands, but in the other direction.
3039  *
3040  * (We can also end up here if drbd is misconfigured.)
3041  */
3042 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3043 {
3044         conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3045                   pi->vnr, cmdname(pi->cmd));
3046         return ignore_remaining_packet(tconn, pi);
3047 }
3048
3049 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3050 {
3051         struct drbd_conf *mdev;
3052         struct p_rs_param_95 *p;
3053         unsigned int header_size, data_size, exp_max_sz;
3054         struct crypto_hash *verify_tfm = NULL;
3055         struct crypto_hash *csums_tfm = NULL;
3056         const int apv = tconn->agreed_pro_version;
3057         int *rs_plan_s = NULL;
3058         int fifo_size = 0;
3059         int err;
3060
3061         mdev = vnr_to_mdev(tconn, pi->vnr);
3062         if (!mdev)
3063                 return config_unknown_volume(tconn, pi);
3064
3065         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3066                     : apv == 88 ? sizeof(struct p_rs_param)
3067                                         + SHARED_SECRET_MAX
3068                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3069                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3070
3071         if (pi->size > exp_max_sz) {
3072                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3073                     pi->size, exp_max_sz);
3074                 return -EIO;
3075         }
3076
3077         if (apv <= 88) {
3078                 header_size = sizeof(struct p_rs_param);
3079                 data_size = pi->size - header_size;
3080         } else if (apv <= 94) {
3081                 header_size = sizeof(struct p_rs_param_89);
3082                 data_size = pi->size - header_size;
3083                 D_ASSERT(data_size == 0);
3084         } else {
3085                 header_size = sizeof(struct p_rs_param_95);
3086                 data_size = pi->size - header_size;
3087                 D_ASSERT(data_size == 0);
3088         }
3089
3090         /* initialize verify_alg and csums_alg */
3091         p = pi->data;
3092         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3093
3094         err = drbd_recv_all(mdev->tconn, p, header_size);
3095         if (err)
3096                 return err;
3097
3098         if (get_ldev(mdev)) {
3099                 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3100                 put_ldev(mdev);
3101         }
3102
3103         if (apv >= 88) {
3104                 if (apv == 88) {
3105                         if (data_size > SHARED_SECRET_MAX) {
3106                                 dev_err(DEV, "verify-alg too long, "
3107                                     "peer wants %u, accepting only %u byte\n",
3108                                                 data_size, SHARED_SECRET_MAX);
3109                                 return -EIO;
3110                         }
3111
3112                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3113                         if (err)
3114                                 return err;
3115
3116                         /* we expect NUL terminated string */
3117                         /* but just in case someone tries to be evil */
3118                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3119                         p->verify_alg[data_size-1] = 0;
3120
3121                 } else /* apv >= 89 */ {
3122                         /* we still expect NUL terminated strings */
3123                         /* but just in case someone tries to be evil */
3124                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3125                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3126                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3127                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3128                 }
3129
3130                 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
3131                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3132                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3133                                     mdev->tconn->net_conf->verify_alg, p->verify_alg);
3134                                 goto disconnect;
3135                         }
3136                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3137                                         p->verify_alg, "verify-alg");
3138                         if (IS_ERR(verify_tfm)) {
3139                                 verify_tfm = NULL;
3140                                 goto disconnect;
3141                         }
3142                 }
3143
3144                 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
3145                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3146                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3147                                     mdev->tconn->net_conf->csums_alg, p->csums_alg);
3148                                 goto disconnect;
3149                         }
3150                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3151                                         p->csums_alg, "csums-alg");
3152                         if (IS_ERR(csums_tfm)) {
3153                                 csums_tfm = NULL;
3154                                 goto disconnect;
3155                         }
3156                 }
3157
3158                 if (apv > 94 && get_ldev(mdev)) {
3159                         mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3160                         mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3161                         mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3162                         mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3163                         mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3164
3165                         fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3166                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3167                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3168                                 if (!rs_plan_s) {
3169                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3170                                         put_ldev(mdev);
3171                                         goto disconnect;
3172                                 }
3173                         }
3174                         put_ldev(mdev);
3175                 }
3176
3177                 spin_lock(&mdev->peer_seq_lock);
3178                 /* lock against drbd_nl_syncer_conf() */
3179                 if (verify_tfm) {
3180                         strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3181                         mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3182                         crypto_free_hash(mdev->tconn->verify_tfm);
3183                         mdev->tconn->verify_tfm = verify_tfm;
3184                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3185                 }
3186                 if (csums_tfm) {
3187                         strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3188                         mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3189                         crypto_free_hash(mdev->tconn->csums_tfm);
3190                         mdev->tconn->csums_tfm = csums_tfm;
3191                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3192                 }
3193                 if (fifo_size != mdev->rs_plan_s.size) {
3194                         kfree(mdev->rs_plan_s.values);
3195                         mdev->rs_plan_s.values = rs_plan_s;
3196                         mdev->rs_plan_s.size   = fifo_size;
3197                         mdev->rs_planed = 0;
3198                 }
3199                 spin_unlock(&mdev->peer_seq_lock);
3200         }
3201         return 0;
3202
3203 disconnect:
3204         /* just for completeness: actually not needed,
3205          * as this is not reached if csums_tfm was ok. */
3206         crypto_free_hash(csums_tfm);
3207         /* but free the verify_tfm again, if csums_tfm did not work out */
3208         crypto_free_hash(verify_tfm);
3209         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3210         return -EIO;
3211 }
3212
3213 /* warn if the arguments differ by more than 12.5% */
3214 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3215         const char *s, sector_t a, sector_t b)
3216 {
3217         sector_t d;
3218         if (a == 0 || b == 0)
3219                 return;
3220         d = (a > b) ? (a - b) : (b - a);
3221         if (d > (a>>3) || d > (b>>3))
3222                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3223                      (unsigned long long)a, (unsigned long long)b);
3224 }
3225
3226 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3227 {
3228         struct drbd_conf *mdev;
3229         struct p_sizes *p = pi->data;
3230         enum determine_dev_size dd = unchanged;
3231         sector_t p_size, p_usize, my_usize;
3232         int ldsc = 0; /* local disk size changed */
3233         enum dds_flags ddsf;
3234
3235         mdev = vnr_to_mdev(tconn, pi->vnr);
3236         if (!mdev)
3237                 return config_unknown_volume(tconn, pi);
3238
3239         p_size = be64_to_cpu(p->d_size);
3240         p_usize = be64_to_cpu(p->u_size);
3241
3242         /* just store the peer's disk size for now.
3243          * we still need to figure out whether we accept that. */
3244         mdev->p_size = p_size;
3245
3246         if (get_ldev(mdev)) {
3247                 warn_if_differ_considerably(mdev, "lower level device sizes",
3248                            p_size, drbd_get_max_capacity(mdev->ldev));
3249                 warn_if_differ_considerably(mdev, "user requested size",
3250                                             p_usize, mdev->ldev->dc.disk_size);
3251
3252                 /* if this is the first connect, or an otherwise expected
3253                  * param exchange, choose the minimum */
3254                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3255                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3256                                              p_usize);
3257
3258                 my_usize = mdev->ldev->dc.disk_size;
3259
3260                 if (mdev->ldev->dc.disk_size != p_usize) {
3261                         mdev->ldev->dc.disk_size = p_usize;
3262                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3263                              (unsigned long)mdev->ldev->dc.disk_size);
3264                 }
3265
3266                 /* Never shrink a device with usable data during connect.
3267                    But allow online shrinking if we are connected. */
3268                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3269                    drbd_get_capacity(mdev->this_bdev) &&
3270                    mdev->state.disk >= D_OUTDATED &&
3271                    mdev->state.conn < C_CONNECTED) {
3272                         dev_err(DEV, "The peer's disk size is too small!\n");
3273                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3274                         mdev->ldev->dc.disk_size = my_usize;
3275                         put_ldev(mdev);
3276                         return -EIO;
3277                 }
3278                 put_ldev(mdev);
3279         }
3280
3281         ddsf = be16_to_cpu(p->dds_flags);
3282         if (get_ldev(mdev)) {
3283                 dd = drbd_determine_dev_size(mdev, ddsf);
3284                 put_ldev(mdev);
3285                 if (dd == dev_size_error)
3286                         return -EIO;
3287                 drbd_md_sync(mdev);
3288         } else {
3289                 /* I am diskless, need to accept the peer's size. */
3290                 drbd_set_my_capacity(mdev, p_size);
3291         }
3292
3293         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3294         drbd_reconsider_max_bio_size(mdev);
3295
3296         if (get_ldev(mdev)) {
3297                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3298                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3299                         ldsc = 1;
3300                 }
3301
3302                 put_ldev(mdev);
3303         }
3304
3305         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3306                 if (be64_to_cpu(p->c_size) !=
3307                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3308                         /* we have different sizes, probably peer
3309                          * needs to know my new size... */
3310                         drbd_send_sizes(mdev, 0, ddsf);
3311                 }
3312                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3313                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3314                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3315                             mdev->state.disk >= D_INCONSISTENT) {
3316                                 if (ddsf & DDSF_NO_RESYNC)
3317                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3318                                 else
3319                                         resync_after_online_grow(mdev);
3320                         } else
3321                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3322                 }
3323         }
3324
3325         return 0;
3326 }
3327
3328 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3329 {
3330         struct drbd_conf *mdev;
3331         struct p_uuids *p = pi->data;
3332         u64 *p_uuid;
3333         int i, updated_uuids = 0;
3334
3335         mdev = vnr_to_mdev(tconn, pi->vnr);
3336         if (!mdev)
3337                 return config_unknown_volume(tconn, pi);
3338
3339         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3340
3341         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3342                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3343
3344         kfree(mdev->p_uuid);
3345         mdev->p_uuid = p_uuid;
3346
3347         if (mdev->state.conn < C_CONNECTED &&
3348             mdev->state.disk < D_INCONSISTENT &&
3349             mdev->state.role == R_PRIMARY &&
3350             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3351                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3352                     (unsigned long long)mdev->ed_uuid);
3353                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3354                 return -EIO;
3355         }
3356
3357         if (get_ldev(mdev)) {
3358                 int skip_initial_sync =
3359                         mdev->state.conn == C_CONNECTED &&
3360                         mdev->tconn->agreed_pro_version >= 90 &&
3361                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3362                         (p_uuid[UI_FLAGS] & 8);
3363                 if (skip_initial_sync) {
3364                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3365                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3366                                         "clear_n_write from receive_uuids",
3367                                         BM_LOCKED_TEST_ALLOWED);
3368                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3369                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3370                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3371                                         CS_VERBOSE, NULL);
3372                         drbd_md_sync(mdev);
3373                         updated_uuids = 1;
3374                 }
3375                 put_ldev(mdev);
3376         } else if (mdev->state.disk < D_INCONSISTENT &&
3377                    mdev->state.role == R_PRIMARY) {
3378                 /* I am a diskless primary, the peer just created a new current UUID
3379                    for me. */
3380                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3381         }
3382
3383         /* Before we test for the disk state, we should wait until an eventually
3384            ongoing cluster wide state change is finished. That is important if
3385            we are primary and are detaching from our disk. We need to see the
3386            new disk state... */
3387         mutex_lock(mdev->state_mutex);
3388         mutex_unlock(mdev->state_mutex);
3389         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3390                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3391
3392         if (updated_uuids)
3393                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3394
3395         return 0;
3396 }
3397
3398 /**
3399  * convert_state() - Converts the peer's view of the cluster state to our point of view
3400  * @ps:         The state as seen by the peer.
3401  */
3402 static union drbd_state convert_state(union drbd_state ps)
3403 {
3404         union drbd_state ms;
3405
3406         static enum drbd_conns c_tab[] = {
3407                 [C_CONNECTED] = C_CONNECTED,
3408
3409                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3410                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3411                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3412                 [C_VERIFY_S]       = C_VERIFY_T,
3413                 [C_MASK]   = C_MASK,
3414         };
3415
3416         ms.i = ps.i;
3417
3418         ms.conn = c_tab[ps.conn];
3419         ms.peer = ps.role;
3420         ms.role = ps.peer;
3421         ms.pdsk = ps.disk;
3422         ms.disk = ps.pdsk;
3423         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3424
3425         return ms;
3426 }
3427
3428 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3429 {
3430         struct drbd_conf *mdev;
3431         struct p_req_state *p = pi->data;
3432         union drbd_state mask, val;
3433         enum drbd_state_rv rv;
3434
3435         mdev = vnr_to_mdev(tconn, pi->vnr);
3436         if (!mdev)
3437                 return -EIO;
3438
3439         mask.i = be32_to_cpu(p->mask);
3440         val.i = be32_to_cpu(p->val);
3441
3442         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3443             mutex_is_locked(mdev->state_mutex)) {
3444                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3445                 return 0;
3446         }
3447
3448         mask = convert_state(mask);
3449         val = convert_state(val);
3450
3451         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3452         drbd_send_sr_reply(mdev, rv);
3453
3454         drbd_md_sync(mdev);
3455
3456         return 0;
3457 }
3458
3459 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3460 {
3461         struct p_req_state *p = pi->data;
3462         union drbd_state mask, val;
3463         enum drbd_state_rv rv;
3464
3465         mask.i = be32_to_cpu(p->mask);
3466         val.i = be32_to_cpu(p->val);
3467
3468         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3469             mutex_is_locked(&tconn->cstate_mutex)) {
3470                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3471                 return 0;
3472         }
3473
3474         mask = convert_state(mask);
3475         val = convert_state(val);
3476
3477         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3478         conn_send_sr_reply(tconn, rv);
3479
3480         return 0;
3481 }
3482
3483 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3484 {
3485         struct drbd_conf *mdev;
3486         struct p_state *p = pi->data;
3487         union drbd_state os, ns, peer_state;
3488         enum drbd_disk_state real_peer_disk;
3489         enum chg_state_flags cs_flags;
3490         int rv;
3491
3492         mdev = vnr_to_mdev(tconn, pi->vnr);
3493         if (!mdev)
3494                 return config_unknown_volume(tconn, pi);
3495
3496         peer_state.i = be32_to_cpu(p->state);
3497
3498         real_peer_disk = peer_state.disk;
3499         if (peer_state.disk == D_NEGOTIATING) {
3500                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3501                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3502         }
3503
3504         spin_lock_irq(&mdev->tconn->req_lock);
3505  retry:
3506         os = ns = drbd_read_state(mdev);
3507         spin_unlock_irq(&mdev->tconn->req_lock);
3508
3509         /* peer says his disk is uptodate, while we think it is inconsistent,
3510          * and this happens while we think we have a sync going on. */
3511         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3512             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3513                 /* If we are (becoming) SyncSource, but peer is still in sync
3514                  * preparation, ignore its uptodate-ness to avoid flapping, it
3515                  * will change to inconsistent once the peer reaches active
3516                  * syncing states.
3517                  * It may have changed syncer-paused flags, however, so we
3518                  * cannot ignore this completely. */
3519                 if (peer_state.conn > C_CONNECTED &&
3520                     peer_state.conn < C_SYNC_SOURCE)
3521                         real_peer_disk = D_INCONSISTENT;
3522
3523                 /* if peer_state changes to connected at the same time,
3524                  * it explicitly notifies us that it finished resync.
3525                  * Maybe we should finish it up, too? */
3526                 else if (os.conn >= C_SYNC_SOURCE &&
3527                          peer_state.conn == C_CONNECTED) {
3528                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3529                                 drbd_resync_finished(mdev);
3530                         return 0;
3531                 }
3532         }
3533
3534         /* peer says his disk is inconsistent, while we think it is uptodate,
3535          * and this happens while the peer still thinks we have a sync going on,
3536          * but we think we are already done with the sync.
3537          * We ignore this to avoid flapping pdsk.
3538          * This should not happen, if the peer is a recent version of drbd. */
3539         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3540             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3541                 real_peer_disk = D_UP_TO_DATE;
3542
3543         if (ns.conn == C_WF_REPORT_PARAMS)
3544                 ns.conn = C_CONNECTED;
3545
3546         if (peer_state.conn == C_AHEAD)
3547                 ns.conn = C_BEHIND;
3548
3549         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3550             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3551                 int cr; /* consider resync */
3552
3553                 /* if we established a new connection */
3554                 cr  = (os.conn < C_CONNECTED);
3555                 /* if we had an established connection
3556                  * and one of the nodes newly attaches a disk */
3557                 cr |= (os.conn == C_CONNECTED &&
3558                        (peer_state.disk == D_NEGOTIATING ||
3559                         os.disk == D_NEGOTIATING));
3560                 /* if we have both been inconsistent, and the peer has been
3561                  * forced to be UpToDate with --overwrite-data */
3562                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3563                 /* if we had been plain connected, and the admin requested to
3564                  * start a sync by "invalidate" or "invalidate-remote" */
3565                 cr |= (os.conn == C_CONNECTED &&
3566                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3567                                  peer_state.conn <= C_WF_BITMAP_T));
3568
3569                 if (cr)
3570                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3571
3572                 put_ldev(mdev);
3573                 if (ns.conn == C_MASK) {
3574                         ns.conn = C_CONNECTED;
3575                         if (mdev->state.disk == D_NEGOTIATING) {
3576                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3577                         } else if (peer_state.disk == D_NEGOTIATING) {
3578                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3579                                 peer_state.disk = D_DISKLESS;
3580                                 real_peer_disk = D_DISKLESS;
3581                         } else {
3582                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3583                                         return -EIO;
3584                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3585                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3586                                 return -EIO;
3587                         }
3588                 }
3589         }
3590
3591         spin_lock_irq(&mdev->tconn->req_lock);
3592         if (os.i != drbd_read_state(mdev).i)
3593                 goto retry;
3594         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3595         ns.peer = peer_state.role;
3596         ns.pdsk = real_peer_disk;
3597         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3598         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3599                 ns.disk = mdev->new_state_tmp.disk;
3600         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3601         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3602             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3603                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3604                    for temporal network outages! */
3605                 spin_unlock_irq(&mdev->tconn->req_lock);
3606                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3607                 tl_clear(mdev->tconn);
3608                 drbd_uuid_new_current(mdev);
3609                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3610                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3611                 return -EIO;
3612         }
3613         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3614         ns = drbd_read_state(mdev);
3615         spin_unlock_irq(&mdev->tconn->req_lock);
3616
3617         if (rv < SS_SUCCESS) {
3618                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3619                 return -EIO;
3620         }
3621
3622         if (os.conn > C_WF_REPORT_PARAMS) {
3623                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3624                     peer_state.disk != D_NEGOTIATING ) {
3625                         /* we want resync, peer has not yet decided to sync... */
3626                         /* Nowadays only used when forcing a node into primary role and
3627                            setting its disk to UpToDate with that */
3628                         drbd_send_uuids(mdev);
3629                         drbd_send_state(mdev);
3630                 }
3631         }
3632
3633         mdev->tconn->net_conf->want_lose = 0;
3634
3635         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3636
3637         return 0;
3638 }
3639
3640 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3641 {
3642         struct drbd_conf *mdev;
3643         struct p_rs_uuid *p = pi->data;
3644
3645         mdev = vnr_to_mdev(tconn, pi->vnr);
3646         if (!mdev)
3647                 return -EIO;
3648
3649         wait_event(mdev->misc_wait,
3650                    mdev->state.conn == C_WF_SYNC_UUID ||
3651                    mdev->state.conn == C_BEHIND ||
3652                    mdev->state.conn < C_CONNECTED ||
3653                    mdev->state.disk < D_NEGOTIATING);
3654
3655         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3656
3657         /* Here the _drbd_uuid_ functions are right, current should
3658            _not_ be rotated into the history */
3659         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3660                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3661                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3662
3663                 drbd_print_uuids(mdev, "updated sync uuid");
3664                 drbd_start_resync(mdev, C_SYNC_TARGET);
3665
3666                 put_ldev(mdev);
3667         } else
3668                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3669
3670         return 0;
3671 }
3672
3673 /**
3674  * receive_bitmap_plain
3675  *
3676  * Return 0 when done, 1 when another iteration is needed, and a negative error
3677  * code upon failure.
3678  */
3679 static int
3680 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3681                      unsigned long *p, struct bm_xfer_ctx *c)
3682 {
3683         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3684                                  drbd_header_size(mdev->tconn);
3685         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3686                                        c->bm_words - c->word_offset);
3687         unsigned int want = num_words * sizeof(*p);
3688         int err;
3689
3690         if (want != size) {
3691                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3692                 return -EIO;
3693         }
3694         if (want == 0)
3695                 return 0;
3696         err = drbd_recv_all(mdev->tconn, p, want);
3697         if (err)
3698                 return err;
3699
3700         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3701
3702         c->word_offset += num_words;
3703         c->bit_offset = c->word_offset * BITS_PER_LONG;
3704         if (c->bit_offset > c->bm_bits)
3705                 c->bit_offset = c->bm_bits;
3706
3707         return 1;
3708 }
3709
3710 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3711 {
3712         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3713 }
3714
3715 static int dcbp_get_start(struct p_compressed_bm *p)
3716 {
3717         return (p->encoding & 0x80) != 0;
3718 }
3719
3720 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3721 {
3722         return (p->encoding >> 4) & 0x7;
3723 }
3724
3725 /**
3726  * recv_bm_rle_bits
3727  *
3728  * Return 0 when done, 1 when another iteration is needed, and a negative error
3729  * code upon failure.
3730  */
3731 static int
3732 recv_bm_rle_bits(struct drbd_conf *mdev,
3733                 struct p_compressed_bm *p,
3734                  struct bm_xfer_ctx *c,
3735                  unsigned int len)
3736 {
3737         struct bitstream bs;
3738         u64 look_ahead;
3739         u64 rl;
3740         u64 tmp;
3741         unsigned long s = c->bit_offset;
3742         unsigned long e;
3743         int toggle = dcbp_get_start(p);
3744         int have;
3745         int bits;
3746
3747         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3748
3749         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3750         if (bits < 0)
3751                 return -EIO;
3752
3753         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3754                 bits = vli_decode_bits(&rl, look_ahead);
3755                 if (bits <= 0)
3756                         return -EIO;
3757
3758                 if (toggle) {
3759                         e = s + rl -1;
3760                         if (e >= c->bm_bits) {
3761                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3762                                 return -EIO;
3763                         }
3764                         _drbd_bm_set_bits(mdev, s, e);
3765                 }
3766
3767                 if (have < bits) {
3768                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3769                                 have, bits, look_ahead,
3770                                 (unsigned int)(bs.cur.b - p->code),
3771                                 (unsigned int)bs.buf_len);
3772                         return -EIO;
3773                 }
3774                 look_ahead >>= bits;
3775                 have -= bits;
3776
3777                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3778                 if (bits < 0)
3779                         return -EIO;
3780                 look_ahead |= tmp << have;
3781                 have += bits;
3782         }
3783
3784         c->bit_offset = s;
3785         bm_xfer_ctx_bit_to_word_offset(c);
3786
3787         return (s != c->bm_bits);
3788 }
3789
3790 /**
3791  * decode_bitmap_c
3792  *
3793  * Return 0 when done, 1 when another iteration is needed, and a negative error
3794  * code upon failure.
3795  */
3796 static int
3797 decode_bitmap_c(struct drbd_conf *mdev,
3798                 struct p_compressed_bm *p,
3799                 struct bm_xfer_ctx *c,
3800                 unsigned int len)
3801 {
3802         if (dcbp_get_code(p) == RLE_VLI_Bits)
3803                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
3804
3805         /* other variants had been implemented for evaluation,
3806          * but have been dropped as this one turned out to be "best"
3807          * during all our tests. */
3808
3809         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3810         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3811         return -EIO;
3812 }
3813
3814 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3815                 const char *direction, struct bm_xfer_ctx *c)
3816 {
3817         /* what would it take to transfer it "plaintext" */
3818         unsigned int header_size = drbd_header_size(mdev->tconn);
3819         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3820         unsigned int plain =
3821                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3822                 c->bm_words * sizeof(unsigned long);
3823         unsigned int total = c->bytes[0] + c->bytes[1];
3824         unsigned int r;
3825
3826         /* total can not be zero. but just in case: */
3827         if (total == 0)
3828                 return;
3829
3830         /* don't report if not compressed */
3831         if (total >= plain)
3832                 return;
3833
3834         /* total < plain. check for overflow, still */
3835         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3836                                     : (1000 * total / plain);
3837
3838         if (r > 1000)
3839                 r = 1000;
3840
3841         r = 1000 - r;
3842         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3843              "total %u; compression: %u.%u%%\n",
3844                         direction,
3845                         c->bytes[1], c->packets[1],
3846                         c->bytes[0], c->packets[0],
3847                         total, r/10, r % 10);
3848 }
3849
3850 /* Since we are processing the bitfield from lower addresses to higher,
3851    it does not matter if the process it in 32 bit chunks or 64 bit
3852    chunks as long as it is little endian. (Understand it as byte stream,
3853    beginning with the lowest byte...) If we would use big endian
3854    we would need to process it from the highest address to the lowest,
3855    in order to be agnostic to the 32 vs 64 bits issue.
3856
3857    returns 0 on failure, 1 if we successfully received it. */
3858 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3859 {
3860         struct drbd_conf *mdev;
3861         struct bm_xfer_ctx c;
3862         int err;
3863
3864         mdev = vnr_to_mdev(tconn, pi->vnr);
3865         if (!mdev)
3866                 return -EIO;
3867
3868         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3869         /* you are supposed to send additional out-of-sync information
3870          * if you actually set bits during this phase */
3871
3872         c = (struct bm_xfer_ctx) {
3873                 .bm_bits = drbd_bm_bits(mdev),
3874                 .bm_words = drbd_bm_words(mdev),
3875         };
3876
3877         for(;;) {
3878                 if (pi->cmd == P_BITMAP)
3879                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
3880                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
3881                         /* MAYBE: sanity check that we speak proto >= 90,
3882                          * and the feature is enabled! */
3883                         struct p_compressed_bm *p = pi->data;
3884
3885                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
3886                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3887                                 err = -EIO;
3888                                 goto out;
3889                         }
3890                         if (pi->size <= sizeof(*p)) {
3891                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
3892                                 err = -EIO;
3893                                 goto out;
3894                         }
3895                         err = drbd_recv_all(mdev->tconn, p, pi->size);
3896                         if (err)
3897                                goto out;
3898                         err = decode_bitmap_c(mdev, p, &c, pi->size);
3899                 } else {
3900                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
3901                         err = -EIO;
3902                         goto out;
3903                 }
3904
3905                 c.packets[pi->cmd == P_BITMAP]++;
3906                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
3907
3908                 if (err <= 0) {
3909                         if (err < 0)
3910                                 goto out;
3911                         break;
3912                 }
3913                 err = drbd_recv_header(mdev->tconn, pi);
3914                 if (err)
3915                         goto out;
3916         }
3917
3918         INFO_bm_xfer_stats(mdev, "receive", &c);
3919
3920         if (mdev->state.conn == C_WF_BITMAP_T) {
3921                 enum drbd_state_rv rv;
3922
3923                 err = drbd_send_bitmap(mdev);
3924                 if (err)
3925                         goto out;
3926                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3927                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3928                 D_ASSERT(rv == SS_SUCCESS);
3929         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3930                 /* admin may have requested C_DISCONNECTING,
3931                  * other threads may have noticed network errors */
3932                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3933                     drbd_conn_str(mdev->state.conn));
3934         }
3935         err = 0;
3936
3937  out:
3938         drbd_bm_unlock(mdev);
3939         if (!err && mdev->state.conn == C_WF_BITMAP_S)
3940                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3941         return err;
3942 }
3943
3944 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
3945 {
3946         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
3947                  pi->cmd, pi->size);
3948
3949         return ignore_remaining_packet(tconn, pi);
3950 }
3951
3952 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
3953 {
3954         /* Make sure we've acked all the TCP data associated
3955          * with the data requests being unplugged */
3956         drbd_tcp_quickack(tconn->data.socket);
3957
3958         return 0;
3959 }
3960
3961 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
3962 {
3963         struct drbd_conf *mdev;
3964         struct p_block_desc *p = pi->data;
3965
3966         mdev = vnr_to_mdev(tconn, pi->vnr);
3967         if (!mdev)
3968                 return -EIO;
3969
3970         switch (mdev->state.conn) {
3971         case C_WF_SYNC_UUID:
3972         case C_WF_BITMAP_T:
3973         case C_BEHIND:
3974                         break;
3975         default:
3976                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3977                                 drbd_conn_str(mdev->state.conn));
3978         }
3979
3980         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3981
3982         return 0;
3983 }
3984
3985 struct data_cmd {
3986         int expect_payload;
3987         size_t pkt_size;
3988         int (*fn)(struct drbd_tconn *, struct packet_info *);
3989 };
3990
3991 static struct data_cmd drbd_cmd_handler[] = {
3992         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
3993         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
3994         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3995         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3996         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
3997         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
3998         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
3999         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4000         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4001         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4002         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4003         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4004         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4005         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4006         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4007         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4008         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4009         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4010         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4011         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4012         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4013         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4014         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4015 };
4016
4017 static void drbdd(struct drbd_tconn *tconn)
4018 {
4019         struct packet_info pi;
4020         size_t shs; /* sub header size */
4021         int err;
4022
4023         while (get_t_state(&tconn->receiver) == RUNNING) {
4024                 struct data_cmd *cmd;
4025
4026                 drbd_thread_current_set_cpu(&tconn->receiver);
4027                 if (drbd_recv_header(tconn, &pi))
4028                         goto err_out;
4029
4030                 cmd = &drbd_cmd_handler[pi.cmd];
4031                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4032                         conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4033                         goto err_out;
4034                 }
4035
4036                 shs = cmd->pkt_size;
4037                 if (pi.size > shs && !cmd->expect_payload) {
4038                         conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4039                         goto err_out;
4040                 }
4041
4042                 if (shs) {
4043                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4044                         if (err)
4045                                 goto err_out;
4046                         pi.size -= shs;
4047                 }
4048
4049                 err = cmd->fn(tconn, &pi);
4050                 if (err) {
4051                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4052                                  cmdname(pi.cmd), err, pi.size);
4053                         goto err_out;
4054                 }
4055         }
4056         return;
4057
4058     err_out:
4059         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4060 }
4061
4062 void conn_flush_workqueue(struct drbd_tconn *tconn)
4063 {
4064         struct drbd_wq_barrier barr;
4065
4066         barr.w.cb = w_prev_work_done;
4067         barr.w.tconn = tconn;
4068         init_completion(&barr.done);
4069         drbd_queue_work(&tconn->data.work, &barr.w);
4070         wait_for_completion(&barr.done);
4071 }
4072
4073 static void drbd_disconnect(struct drbd_tconn *tconn)
4074 {
4075         enum drbd_conns oc;
4076         int rv = SS_UNKNOWN_ERROR;
4077
4078         if (tconn->cstate == C_STANDALONE)
4079                 return;
4080
4081         /* asender does not clean up anything. it must not interfere, either */
4082         drbd_thread_stop(&tconn->asender);
4083         drbd_free_sock(tconn);
4084
4085         idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4086         conn_info(tconn, "Connection closed\n");
4087
4088         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4089                 conn_try_outdate_peer_async(tconn);
4090
4091         spin_lock_irq(&tconn->req_lock);
4092         oc = tconn->cstate;
4093         if (oc >= C_UNCONNECTED)
4094                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4095
4096         spin_unlock_irq(&tconn->req_lock);
4097
4098         if (oc == C_DISCONNECTING) {
4099                 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4100
4101                 crypto_free_hash(tconn->cram_hmac_tfm);
4102                 tconn->cram_hmac_tfm = NULL;
4103
4104                 kfree(tconn->net_conf);
4105                 tconn->net_conf = NULL;
4106                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
4107         }
4108 }
4109
4110 static int drbd_disconnected(int vnr, void *p, void *data)
4111 {
4112         struct drbd_conf *mdev = (struct drbd_conf *)p;
4113         enum drbd_fencing_p fp;
4114         unsigned int i;
4115
4116         /* wait for current activity to cease. */
4117         spin_lock_irq(&mdev->tconn->req_lock);
4118         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4119         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4120         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4121         spin_unlock_irq(&mdev->tconn->req_lock);
4122
4123         /* We do not have data structures that would allow us to
4124          * get the rs_pending_cnt down to 0 again.
4125          *  * On C_SYNC_TARGET we do not have any data structures describing
4126          *    the pending RSDataRequest's we have sent.
4127          *  * On C_SYNC_SOURCE there is no data structure that tracks
4128          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4129          *  And no, it is not the sum of the reference counts in the
4130          *  resync_LRU. The resync_LRU tracks the whole operation including
4131          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4132          *  on the fly. */
4133         drbd_rs_cancel_all(mdev);
4134         mdev->rs_total = 0;
4135         mdev->rs_failed = 0;
4136         atomic_set(&mdev->rs_pending_cnt, 0);
4137         wake_up(&mdev->misc_wait);
4138
4139         del_timer(&mdev->request_timer);
4140
4141         del_timer_sync(&mdev->resync_timer);
4142         resync_timer_fn((unsigned long)mdev);
4143
4144         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4145          * w_make_resync_request etc. which may still be on the worker queue
4146          * to be "canceled" */
4147         drbd_flush_workqueue(mdev);
4148
4149         drbd_finish_peer_reqs(mdev);
4150
4151         kfree(mdev->p_uuid);
4152         mdev->p_uuid = NULL;
4153
4154         if (!drbd_suspended(mdev))
4155                 tl_clear(mdev->tconn);
4156
4157         drbd_md_sync(mdev);
4158
4159         fp = FP_DONT_CARE;
4160         if (get_ldev(mdev)) {
4161                 fp = mdev->ldev->dc.fencing;
4162                 put_ldev(mdev);
4163         }
4164
4165         /* serialize with bitmap writeout triggered by the state change,
4166          * if any. */
4167         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4168
4169         /* tcp_close and release of sendpage pages can be deferred.  I don't
4170          * want to use SO_LINGER, because apparently it can be deferred for
4171          * more than 20 seconds (longest time I checked).
4172          *
4173          * Actually we don't care for exactly when the network stack does its
4174          * put_page(), but release our reference on these pages right here.
4175          */
4176         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4177         if (i)
4178                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4179         i = atomic_read(&mdev->pp_in_use_by_net);
4180         if (i)
4181                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4182         i = atomic_read(&mdev->pp_in_use);
4183         if (i)
4184                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4185
4186         D_ASSERT(list_empty(&mdev->read_ee));
4187         D_ASSERT(list_empty(&mdev->active_ee));
4188         D_ASSERT(list_empty(&mdev->sync_ee));
4189         D_ASSERT(list_empty(&mdev->done_ee));
4190
4191         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4192         atomic_set(&mdev->current_epoch->epoch_size, 0);
4193         D_ASSERT(list_empty(&mdev->current_epoch->list));
4194
4195         return 0;
4196 }
4197
4198 /*
4199  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4200  * we can agree on is stored in agreed_pro_version.
4201  *
4202  * feature flags and the reserved array should be enough room for future
4203  * enhancements of the handshake protocol, and possible plugins...
4204  *
4205  * for now, they are expected to be zero, but ignored.
4206  */
4207 static int drbd_send_features(struct drbd_tconn *tconn)
4208 {
4209         struct drbd_socket *sock;
4210         struct p_connection_features *p;
4211
4212         sock = &tconn->data;
4213         p = conn_prepare_command(tconn, sock);
4214         if (!p)
4215                 return -EIO;
4216         memset(p, 0, sizeof(*p));
4217         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4218         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4219         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4220 }
4221
4222 /*
4223  * return values:
4224  *   1 yes, we have a valid connection
4225  *   0 oops, did not work out, please try again
4226  *  -1 peer talks different language,
4227  *     no point in trying again, please go standalone.
4228  */
4229 static int drbd_do_features(struct drbd_tconn *tconn)
4230 {
4231         /* ASSERT current == tconn->receiver ... */
4232         struct p_connection_features *p;
4233         const int expect = sizeof(struct p_connection_features);
4234         struct packet_info pi;
4235         int err;
4236
4237         err = drbd_send_features(tconn);
4238         if (err)
4239                 return 0;
4240
4241         err = drbd_recv_header(tconn, &pi);
4242         if (err)
4243                 return 0;
4244
4245         if (pi.cmd != P_CONNECTION_FEATURES) {
4246                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4247                      cmdname(pi.cmd), pi.cmd);
4248                 return -1;
4249         }
4250
4251         if (pi.size != expect) {
4252                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4253                      expect, pi.size);
4254                 return -1;
4255         }
4256
4257         p = pi.data;
4258         err = drbd_recv_all_warn(tconn, p, expect);
4259         if (err)
4260                 return 0;
4261
4262         p->protocol_min = be32_to_cpu(p->protocol_min);
4263         p->protocol_max = be32_to_cpu(p->protocol_max);
4264         if (p->protocol_max == 0)
4265                 p->protocol_max = p->protocol_min;
4266
4267         if (PRO_VERSION_MAX < p->protocol_min ||
4268             PRO_VERSION_MIN > p->protocol_max)
4269                 goto incompat;
4270
4271         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4272
4273         conn_info(tconn, "Handshake successful: "
4274              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4275
4276         return 1;
4277
4278  incompat:
4279         conn_err(tconn, "incompatible DRBD dialects: "
4280             "I support %d-%d, peer supports %d-%d\n",
4281             PRO_VERSION_MIN, PRO_VERSION_MAX,
4282             p->protocol_min, p->protocol_max);
4283         return -1;
4284 }
4285
4286 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4287 static int drbd_do_auth(struct drbd_tconn *tconn)
4288 {
4289         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4290         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4291         return -1;
4292 }
4293 #else
4294 #define CHALLENGE_LEN 64
4295
4296 /* Return value:
4297         1 - auth succeeded,
4298         0 - failed, try again (network error),
4299         -1 - auth failed, don't try again.
4300 */
4301
4302 static int drbd_do_auth(struct drbd_tconn *tconn)
4303 {
4304         struct drbd_socket *sock;
4305         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4306         struct scatterlist sg;
4307         char *response = NULL;
4308         char *right_response = NULL;
4309         char *peers_ch = NULL;
4310         unsigned int key_len = strlen(tconn->net_conf->shared_secret);
4311         unsigned int resp_size;
4312         struct hash_desc desc;
4313         struct packet_info pi;
4314         int err, rv;
4315
4316         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4317
4318         desc.tfm = tconn->cram_hmac_tfm;
4319         desc.flags = 0;
4320
4321         rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4322                                 (u8 *)tconn->net_conf->shared_secret, key_len);
4323         if (rv) {
4324                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4325                 rv = -1;
4326                 goto fail;
4327         }
4328
4329         get_random_bytes(my_challenge, CHALLENGE_LEN);
4330
4331         sock = &tconn->data;
4332         if (!conn_prepare_command(tconn, sock)) {
4333                 rv = 0;
4334                 goto fail;
4335         }
4336         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4337                                 my_challenge, CHALLENGE_LEN);
4338         if (!rv)
4339                 goto fail;
4340
4341         err = drbd_recv_header(tconn, &pi);
4342         if (err) {
4343                 rv = 0;
4344                 goto fail;
4345         }
4346
4347         if (pi.cmd != P_AUTH_CHALLENGE) {
4348                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4349                     cmdname(pi.cmd), pi.cmd);
4350                 rv = 0;
4351                 goto fail;
4352         }
4353
4354         if (pi.size > CHALLENGE_LEN * 2) {
4355                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4356                 rv = -1;
4357                 goto fail;
4358         }
4359
4360         peers_ch = kmalloc(pi.size, GFP_NOIO);
4361         if (peers_ch == NULL) {
4362                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4363                 rv = -1;
4364                 goto fail;
4365         }
4366
4367         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4368         if (err) {
4369                 rv = 0;
4370                 goto fail;
4371         }
4372
4373         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4374         response = kmalloc(resp_size, GFP_NOIO);
4375         if (response == NULL) {
4376                 conn_err(tconn, "kmalloc of response failed\n");
4377                 rv = -1;
4378                 goto fail;
4379         }
4380
4381         sg_init_table(&sg, 1);
4382         sg_set_buf(&sg, peers_ch, pi.size);
4383
4384         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4385         if (rv) {
4386                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4387                 rv = -1;
4388                 goto fail;
4389         }
4390
4391         if (!conn_prepare_command(tconn, sock)) {
4392                 rv = 0;
4393                 goto fail;
4394         }
4395         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4396                                 response, resp_size);
4397         if (!rv)
4398                 goto fail;
4399
4400         err = drbd_recv_header(tconn, &pi);
4401         if (err) {
4402                 rv = 0;
4403                 goto fail;
4404         }
4405
4406         if (pi.cmd != P_AUTH_RESPONSE) {
4407                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4408                         cmdname(pi.cmd), pi.cmd);
4409                 rv = 0;
4410                 goto fail;
4411         }
4412
4413         if (pi.size != resp_size) {
4414                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4415                 rv = 0;
4416                 goto fail;
4417         }
4418
4419         err = drbd_recv_all_warn(tconn, response , resp_size);
4420         if (err) {
4421                 rv = 0;
4422                 goto fail;
4423         }
4424
4425         right_response = kmalloc(resp_size, GFP_NOIO);
4426         if (right_response == NULL) {
4427                 conn_err(tconn, "kmalloc of right_response failed\n");
4428                 rv = -1;
4429                 goto fail;
4430         }
4431
4432         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4433
4434         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4435         if (rv) {
4436                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4437                 rv = -1;
4438                 goto fail;
4439         }
4440
4441         rv = !memcmp(response, right_response, resp_size);
4442
4443         if (rv)
4444                 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4445                      resp_size, tconn->net_conf->cram_hmac_alg);
4446         else
4447                 rv = -1;
4448
4449  fail:
4450         kfree(peers_ch);
4451         kfree(response);
4452         kfree(right_response);
4453
4454         return rv;
4455 }
4456 #endif
4457
4458 int drbdd_init(struct drbd_thread *thi)
4459 {
4460         struct drbd_tconn *tconn = thi->tconn;
4461         int h;
4462
4463         conn_info(tconn, "receiver (re)started\n");
4464
4465         do {
4466                 h = drbd_connect(tconn);
4467                 if (h == 0) {
4468                         drbd_disconnect(tconn);
4469                         schedule_timeout_interruptible(HZ);
4470                 }
4471                 if (h == -1) {
4472                         conn_warn(tconn, "Discarding network configuration.\n");
4473                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4474                 }
4475         } while (h == 0);
4476
4477         if (h > 0) {
4478                 if (get_net_conf(tconn)) {
4479                         drbdd(tconn);
4480                         put_net_conf(tconn);
4481                 }
4482         }
4483
4484         drbd_disconnect(tconn);
4485
4486         conn_info(tconn, "receiver terminated\n");
4487         return 0;
4488 }
4489
4490 /* ********* acknowledge sender ******** */
4491
4492 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4493 {
4494         struct p_req_state_reply *p = pi->data;
4495         int retcode = be32_to_cpu(p->retcode);
4496
4497         if (retcode >= SS_SUCCESS) {
4498                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4499         } else {
4500                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4501                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4502                          drbd_set_st_err_str(retcode), retcode);
4503         }
4504         wake_up(&tconn->ping_wait);
4505
4506         return 0;
4507 }
4508
4509 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4510 {
4511         struct drbd_conf *mdev;
4512         struct p_req_state_reply *p = pi->data;
4513         int retcode = be32_to_cpu(p->retcode);
4514
4515         mdev = vnr_to_mdev(tconn, pi->vnr);
4516         if (!mdev)
4517                 return -EIO;
4518
4519         if (retcode >= SS_SUCCESS) {
4520                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4521         } else {
4522                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4523                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4524                         drbd_set_st_err_str(retcode), retcode);
4525         }
4526         wake_up(&mdev->state_wait);
4527
4528         return 0;
4529 }
4530
4531 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4532 {
4533         return drbd_send_ping_ack(tconn);
4534
4535 }
4536
4537 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4538 {
4539         /* restore idle timeout */
4540         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4541         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4542                 wake_up(&tconn->ping_wait);
4543
4544         return 0;
4545 }
4546
4547 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4548 {
4549         struct drbd_conf *mdev;
4550         struct p_block_ack *p = pi->data;
4551         sector_t sector = be64_to_cpu(p->sector);
4552         int blksize = be32_to_cpu(p->blksize);
4553
4554         mdev = vnr_to_mdev(tconn, pi->vnr);
4555         if (!mdev)
4556                 return -EIO;
4557
4558         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4559
4560         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4561
4562         if (get_ldev(mdev)) {
4563                 drbd_rs_complete_io(mdev, sector);
4564                 drbd_set_in_sync(mdev, sector, blksize);
4565                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4566                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4567                 put_ldev(mdev);
4568         }
4569         dec_rs_pending(mdev);
4570         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4571
4572         return 0;
4573 }
4574
4575 static int
4576 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4577                               struct rb_root *root, const char *func,
4578                               enum drbd_req_event what, bool missing_ok)
4579 {
4580         struct drbd_request *req;
4581         struct bio_and_error m;
4582
4583         spin_lock_irq(&mdev->tconn->req_lock);
4584         req = find_request(mdev, root, id, sector, missing_ok, func);
4585         if (unlikely(!req)) {
4586                 spin_unlock_irq(&mdev->tconn->req_lock);
4587                 return -EIO;
4588         }
4589         __req_mod(req, what, &m);
4590         spin_unlock_irq(&mdev->tconn->req_lock);
4591
4592         if (m.bio)
4593                 complete_master_bio(mdev, &m);
4594         return 0;
4595 }
4596
4597 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4598 {
4599         struct drbd_conf *mdev;
4600         struct p_block_ack *p = pi->data;
4601         sector_t sector = be64_to_cpu(p->sector);
4602         int blksize = be32_to_cpu(p->blksize);
4603         enum drbd_req_event what;
4604
4605         mdev = vnr_to_mdev(tconn, pi->vnr);
4606         if (!mdev)
4607                 return -EIO;
4608
4609         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4610
4611         if (p->block_id == ID_SYNCER) {
4612                 drbd_set_in_sync(mdev, sector, blksize);
4613                 dec_rs_pending(mdev);
4614                 return 0;
4615         }
4616         switch (pi->cmd) {
4617         case P_RS_WRITE_ACK:
4618                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4619                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4620                 break;
4621         case P_WRITE_ACK:
4622                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4623                 what = WRITE_ACKED_BY_PEER;
4624                 break;
4625         case P_RECV_ACK:
4626                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
4627                 what = RECV_ACKED_BY_PEER;
4628                 break;
4629         case P_DISCARD_WRITE:
4630                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4631                 what = DISCARD_WRITE;
4632                 break;
4633         case P_RETRY_WRITE:
4634                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4635                 what = POSTPONE_WRITE;
4636                 break;
4637         default:
4638                 BUG();
4639         }
4640
4641         return validate_req_change_req_state(mdev, p->block_id, sector,
4642                                              &mdev->write_requests, __func__,
4643                                              what, false);
4644 }
4645
4646 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4647 {
4648         struct drbd_conf *mdev;
4649         struct p_block_ack *p = pi->data;
4650         sector_t sector = be64_to_cpu(p->sector);
4651         int size = be32_to_cpu(p->blksize);
4652         bool missing_ok = tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4653                           tconn->net_conf->wire_protocol == DRBD_PROT_B;
4654         int err;
4655
4656         mdev = vnr_to_mdev(tconn, pi->vnr);
4657         if (!mdev)
4658                 return -EIO;
4659
4660         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4661
4662         if (p->block_id == ID_SYNCER) {
4663                 dec_rs_pending(mdev);
4664                 drbd_rs_failed_io(mdev, sector, size);
4665                 return 0;
4666         }
4667
4668         err = validate_req_change_req_state(mdev, p->block_id, sector,
4669                                             &mdev->write_requests, __func__,
4670                                             NEG_ACKED, missing_ok);
4671         if (err) {
4672                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4673                    The master bio might already be completed, therefore the
4674                    request is no longer in the collision hash. */
4675                 /* In Protocol B we might already have got a P_RECV_ACK
4676                    but then get a P_NEG_ACK afterwards. */
4677                 if (!missing_ok)
4678                         return err;
4679                 drbd_set_out_of_sync(mdev, sector, size);
4680         }
4681         return 0;
4682 }
4683
4684 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4685 {
4686         struct drbd_conf *mdev;
4687         struct p_block_ack *p = pi->data;
4688         sector_t sector = be64_to_cpu(p->sector);
4689
4690         mdev = vnr_to_mdev(tconn, pi->vnr);
4691         if (!mdev)
4692                 return -EIO;
4693
4694         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4695
4696         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4697             (unsigned long long)sector, be32_to_cpu(p->blksize));
4698
4699         return validate_req_change_req_state(mdev, p->block_id, sector,
4700                                              &mdev->read_requests, __func__,
4701                                              NEG_ACKED, false);
4702 }
4703
4704 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4705 {
4706         struct drbd_conf *mdev;
4707         sector_t sector;
4708         int size;
4709         struct p_block_ack *p = pi->data;
4710
4711         mdev = vnr_to_mdev(tconn, pi->vnr);
4712         if (!mdev)
4713                 return -EIO;
4714
4715         sector = be64_to_cpu(p->sector);
4716         size = be32_to_cpu(p->blksize);
4717
4718         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4719
4720         dec_rs_pending(mdev);
4721
4722         if (get_ldev_if_state(mdev, D_FAILED)) {
4723                 drbd_rs_complete_io(mdev, sector);
4724                 switch (pi->cmd) {
4725                 case P_NEG_RS_DREPLY:
4726                         drbd_rs_failed_io(mdev, sector, size);
4727                 case P_RS_CANCEL:
4728                         break;
4729                 default:
4730                         BUG();
4731                 }
4732                 put_ldev(mdev);
4733         }
4734
4735         return 0;
4736 }
4737
4738 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4739 {
4740         struct drbd_conf *mdev;
4741         struct p_barrier_ack *p = pi->data;
4742
4743         mdev = vnr_to_mdev(tconn, pi->vnr);
4744         if (!mdev)
4745                 return -EIO;
4746
4747         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4748
4749         if (mdev->state.conn == C_AHEAD &&
4750             atomic_read(&mdev->ap_in_flight) == 0 &&
4751             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4752                 mdev->start_resync_timer.expires = jiffies + HZ;
4753                 add_timer(&mdev->start_resync_timer);
4754         }
4755
4756         return 0;
4757 }
4758
4759 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4760 {
4761         struct drbd_conf *mdev;
4762         struct p_block_ack *p = pi->data;
4763         struct drbd_work *w;
4764         sector_t sector;
4765         int size;
4766
4767         mdev = vnr_to_mdev(tconn, pi->vnr);
4768         if (!mdev)
4769                 return -EIO;
4770
4771         sector = be64_to_cpu(p->sector);
4772         size = be32_to_cpu(p->blksize);
4773
4774         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4775
4776         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4777                 drbd_ov_out_of_sync_found(mdev, sector, size);
4778         else
4779                 ov_out_of_sync_print(mdev);
4780
4781         if (!get_ldev(mdev))
4782                 return 0;
4783
4784         drbd_rs_complete_io(mdev, sector);
4785         dec_rs_pending(mdev);
4786
4787         --mdev->ov_left;
4788
4789         /* let's advance progress step marks only for every other megabyte */
4790         if ((mdev->ov_left & 0x200) == 0x200)
4791                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4792
4793         if (mdev->ov_left == 0) {
4794                 w = kmalloc(sizeof(*w), GFP_NOIO);
4795                 if (w) {
4796                         w->cb = w_ov_finished;
4797                         w->mdev = mdev;
4798                         drbd_queue_work_front(&mdev->tconn->data.work, w);
4799                 } else {
4800                         dev_err(DEV, "kmalloc(w) failed.");
4801                         ov_out_of_sync_print(mdev);
4802                         drbd_resync_finished(mdev);
4803                 }
4804         }
4805         put_ldev(mdev);
4806         return 0;
4807 }
4808
4809 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4810 {
4811         return 0;
4812 }
4813
4814 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
4815 {
4816         struct drbd_conf *mdev;
4817         int i, not_empty = 0;
4818
4819         do {
4820                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4821                 flush_signals(current);
4822                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4823                         if (drbd_finish_peer_reqs(mdev))
4824                                 return 1; /* error */
4825                 }
4826                 set_bit(SIGNAL_ASENDER, &tconn->flags);
4827
4828                 spin_lock_irq(&tconn->req_lock);
4829                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4830                         not_empty = !list_empty(&mdev->done_ee);
4831                         if (not_empty)
4832                                 break;
4833                 }
4834                 spin_unlock_irq(&tconn->req_lock);
4835         } while (not_empty);
4836
4837         return 0;
4838 }
4839
4840 struct asender_cmd {
4841         size_t pkt_size;
4842         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
4843 };
4844
4845 static struct asender_cmd asender_tbl[] = {
4846         [P_PING]            = { 0, got_Ping },
4847         [P_PING_ACK]        = { 0, got_PingAck },
4848         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4849         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4850         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4851         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
4852         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4853         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4854         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
4855         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4856         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4857         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4858         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4859         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4860         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
4861         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4862         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
4863 };
4864
4865 int drbd_asender(struct drbd_thread *thi)
4866 {
4867         struct drbd_tconn *tconn = thi->tconn;
4868         struct asender_cmd *cmd = NULL;
4869         struct packet_info pi;
4870         int rv;
4871         void *buf    = tconn->meta.rbuf;
4872         int received = 0;
4873         unsigned int header_size = drbd_header_size(tconn);
4874         int expect   = header_size;
4875         int ping_timeout_active = 0;
4876
4877         current->policy = SCHED_RR;  /* Make this a realtime task! */
4878         current->rt_priority = 2;    /* more important than all other tasks */
4879
4880         while (get_t_state(thi) == RUNNING) {
4881                 drbd_thread_current_set_cpu(thi);
4882                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
4883                         if (drbd_send_ping(tconn)) {
4884                                 conn_err(tconn, "drbd_send_ping has failed\n");
4885                                 goto reconnect;
4886                         }
4887                         tconn->meta.socket->sk->sk_rcvtimeo =
4888                                 tconn->net_conf->ping_timeo*HZ/10;
4889                         ping_timeout_active = 1;
4890                 }
4891
4892                 /* TODO: conditionally cork; it may hurt latency if we cork without
4893                    much to send */
4894                 if (!tconn->net_conf->no_cork)
4895                         drbd_tcp_cork(tconn->meta.socket);
4896                 if (tconn_finish_peer_reqs(tconn)) {
4897                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
4898                         goto reconnect;
4899                 }
4900                 /* but unconditionally uncork unless disabled */
4901                 if (!tconn->net_conf->no_cork)
4902                         drbd_tcp_uncork(tconn->meta.socket);
4903
4904                 /* short circuit, recv_msg would return EINTR anyways. */
4905                 if (signal_pending(current))
4906                         continue;
4907
4908                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4909                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4910
4911                 flush_signals(current);
4912
4913                 /* Note:
4914                  * -EINTR        (on meta) we got a signal
4915                  * -EAGAIN       (on meta) rcvtimeo expired
4916                  * -ECONNRESET   other side closed the connection
4917                  * -ERESTARTSYS  (on data) we got a signal
4918                  * rv <  0       other than above: unexpected error!
4919                  * rv == expected: full header or command
4920                  * rv <  expected: "woken" by signal during receive
4921                  * rv == 0       : "connection shut down by peer"
4922                  */
4923                 if (likely(rv > 0)) {
4924                         received += rv;
4925                         buf      += rv;
4926                 } else if (rv == 0) {
4927                         conn_err(tconn, "meta connection shut down by peer.\n");
4928                         goto reconnect;
4929                 } else if (rv == -EAGAIN) {
4930                         /* If the data socket received something meanwhile,
4931                          * that is good enough: peer is still alive. */
4932                         if (time_after(tconn->last_received,
4933                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
4934                                 continue;
4935                         if (ping_timeout_active) {
4936                                 conn_err(tconn, "PingAck did not arrive in time.\n");
4937                                 goto reconnect;
4938                         }
4939                         set_bit(SEND_PING, &tconn->flags);
4940                         continue;
4941                 } else if (rv == -EINTR) {
4942                         continue;
4943                 } else {
4944                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
4945                         goto reconnect;
4946                 }
4947
4948                 if (received == expect && cmd == NULL) {
4949                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
4950                                 goto reconnect;
4951                         cmd = &asender_tbl[pi.cmd];
4952                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
4953                                 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
4954                                         pi.cmd, pi.size);
4955                                 goto disconnect;
4956                         }
4957                         expect = header_size + cmd->pkt_size;
4958                         if (pi.size != expect - header_size) {
4959                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
4960                                         pi.cmd, pi.size);
4961                                 goto reconnect;
4962                         }
4963                 }
4964                 if (received == expect) {
4965                         bool err;
4966
4967                         err = cmd->fn(tconn, &pi);
4968                         if (err) {
4969                                 conn_err(tconn, "%pf failed\n", cmd->fn);
4970                                 goto reconnect;
4971                         }
4972
4973                         tconn->last_received = jiffies;
4974
4975                         /* the idle_timeout (ping-int)
4976                          * has been restored in got_PingAck() */
4977                         if (cmd == &asender_tbl[P_PING_ACK])
4978                                 ping_timeout_active = 0;
4979
4980                         buf      = tconn->meta.rbuf;
4981                         received = 0;
4982                         expect   = header_size;
4983                         cmd      = NULL;
4984                 }
4985         }
4986
4987         if (0) {
4988 reconnect:
4989                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4990         }
4991         if (0) {
4992 disconnect:
4993                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4994         }
4995         clear_bit(SIGNAL_ASENDER, &tconn->flags);
4996
4997         conn_info(tconn, "asender terminated\n");
4998
4999         return 0;
5000 }