]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
drbd: Map from (connection, volume number) to device in the receive handlers
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55 };
56
57 enum finish_epoch {
58         FE_STILL_LIVE,
59         FE_DESTROYED,
60         FE_RECYCLED,
61 };
62
63 enum mdev_or_conn {
64         MDEV,
65         CONN,
66 };
67
68 static int drbd_do_handshake(struct drbd_tconn *tconn);
69 static int drbd_do_auth(struct drbd_tconn *tconn);
70 static int drbd_disconnected(int vnr, void *p, void *data);
71
72 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
73 static int e_end_block(struct drbd_work *, int);
74
75
76 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77
78 /*
79  * some helper functions to deal with single linked page lists,
80  * page->private being our "next" pointer.
81  */
82
83 /* If at least n pages are linked at head, get n pages off.
84  * Otherwise, don't modify head, and return NULL.
85  * Locking is the responsibility of the caller.
86  */
87 static struct page *page_chain_del(struct page **head, int n)
88 {
89         struct page *page;
90         struct page *tmp;
91
92         BUG_ON(!n);
93         BUG_ON(!head);
94
95         page = *head;
96
97         if (!page)
98                 return NULL;
99
100         while (page) {
101                 tmp = page_chain_next(page);
102                 if (--n == 0)
103                         break; /* found sufficient pages */
104                 if (tmp == NULL)
105                         /* insufficient pages, don't use any of them. */
106                         return NULL;
107                 page = tmp;
108         }
109
110         /* add end of list marker for the returned list */
111         set_page_private(page, 0);
112         /* actual return value, and adjustment of head */
113         page = *head;
114         *head = tmp;
115         return page;
116 }
117
118 /* may be used outside of locks to find the tail of a (usually short)
119  * "private" page chain, before adding it back to a global chain head
120  * with page_chain_add() under a spinlock. */
121 static struct page *page_chain_tail(struct page *page, int *len)
122 {
123         struct page *tmp;
124         int i = 1;
125         while ((tmp = page_chain_next(page)))
126                 ++i, page = tmp;
127         if (len)
128                 *len = i;
129         return page;
130 }
131
132 static int page_chain_free(struct page *page)
133 {
134         struct page *tmp;
135         int i = 0;
136         page_chain_for_each_safe(page, tmp) {
137                 put_page(page);
138                 ++i;
139         }
140         return i;
141 }
142
143 static void page_chain_add(struct page **head,
144                 struct page *chain_first, struct page *chain_last)
145 {
146 #if 1
147         struct page *tmp;
148         tmp = page_chain_tail(chain_first, NULL);
149         BUG_ON(tmp != chain_last);
150 #endif
151
152         /* add chain to head */
153         set_page_private(chain_last, (unsigned long)*head);
154         *head = chain_first;
155 }
156
157 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
158 {
159         struct page *page = NULL;
160         struct page *tmp = NULL;
161         int i = 0;
162
163         /* Yes, testing drbd_pp_vacant outside the lock is racy.
164          * So what. It saves a spin_lock. */
165         if (drbd_pp_vacant >= number) {
166                 spin_lock(&drbd_pp_lock);
167                 page = page_chain_del(&drbd_pp_pool, number);
168                 if (page)
169                         drbd_pp_vacant -= number;
170                 spin_unlock(&drbd_pp_lock);
171                 if (page)
172                         return page;
173         }
174
175         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
176          * "criss-cross" setup, that might cause write-out on some other DRBD,
177          * which in turn might block on the other node at this very place.  */
178         for (i = 0; i < number; i++) {
179                 tmp = alloc_page(GFP_TRY);
180                 if (!tmp)
181                         break;
182                 set_page_private(tmp, (unsigned long)page);
183                 page = tmp;
184         }
185
186         if (i == number)
187                 return page;
188
189         /* Not enough pages immediately available this time.
190          * No need to jump around here, drbd_pp_alloc will retry this
191          * function "soon". */
192         if (page) {
193                 tmp = page_chain_tail(page, NULL);
194                 spin_lock(&drbd_pp_lock);
195                 page_chain_add(&drbd_pp_pool, page, tmp);
196                 drbd_pp_vacant += i;
197                 spin_unlock(&drbd_pp_lock);
198         }
199         return NULL;
200 }
201
202 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
203 {
204         struct drbd_peer_request *peer_req;
205         struct list_head *le, *tle;
206
207         /* The EEs are always appended to the end of the list. Since
208            they are sent in order over the wire, they have to finish
209            in order. As soon as we see the first not finished we can
210            stop to examine the list... */
211
212         list_for_each_safe(le, tle, &mdev->net_ee) {
213                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
214                 if (drbd_ee_has_active_page(peer_req))
215                         break;
216                 list_move(le, to_be_freed);
217         }
218 }
219
220 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
221 {
222         LIST_HEAD(reclaimed);
223         struct drbd_peer_request *peer_req, *t;
224
225         spin_lock_irq(&mdev->tconn->req_lock);
226         reclaim_net_ee(mdev, &reclaimed);
227         spin_unlock_irq(&mdev->tconn->req_lock);
228
229         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
230                 drbd_free_net_ee(mdev, peer_req);
231 }
232
233 /**
234  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
235  * @mdev:       DRBD device.
236  * @number:     number of pages requested
237  * @retry:      whether to retry, if not enough pages are available right now
238  *
239  * Tries to allocate number pages, first from our own page pool, then from
240  * the kernel, unless this allocation would exceed the max_buffers setting.
241  * Possibly retry until DRBD frees sufficient pages somewhere else.
242  *
243  * Returns a page chain linked via page->private.
244  */
245 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
246 {
247         struct page *page = NULL;
248         DEFINE_WAIT(wait);
249
250         /* Yes, we may run up to @number over max_buffers. If we
251          * follow it strictly, the admin will get it wrong anyways. */
252         if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
253                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
254
255         while (page == NULL) {
256                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
257
258                 drbd_kick_lo_and_reclaim_net(mdev);
259
260                 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
261                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
262                         if (page)
263                                 break;
264                 }
265
266                 if (!retry)
267                         break;
268
269                 if (signal_pending(current)) {
270                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
271                         break;
272                 }
273
274                 schedule();
275         }
276         finish_wait(&drbd_pp_wait, &wait);
277
278         if (page)
279                 atomic_add(number, &mdev->pp_in_use);
280         return page;
281 }
282
283 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
284  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
285  * Either links the page chain back to the global pool,
286  * or returns all pages to the system. */
287 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
288 {
289         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
290         int i;
291
292         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
293                 i = page_chain_free(page);
294         else {
295                 struct page *tmp;
296                 tmp = page_chain_tail(page, &i);
297                 spin_lock(&drbd_pp_lock);
298                 page_chain_add(&drbd_pp_pool, page, tmp);
299                 drbd_pp_vacant += i;
300                 spin_unlock(&drbd_pp_lock);
301         }
302         i = atomic_sub_return(i, a);
303         if (i < 0)
304                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
305                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
306         wake_up(&drbd_pp_wait);
307 }
308
309 /*
310 You need to hold the req_lock:
311  _drbd_wait_ee_list_empty()
312
313 You must not have the req_lock:
314  drbd_free_ee()
315  drbd_alloc_ee()
316  drbd_init_ee()
317  drbd_release_ee()
318  drbd_ee_fix_bhs()
319  drbd_process_done_ee()
320  drbd_clear_done_ee()
321  drbd_wait_ee_list_empty()
322 */
323
324 struct drbd_peer_request *
325 drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
326               unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
327 {
328         struct drbd_peer_request *peer_req;
329         struct page *page;
330         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
331
332         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
333                 return NULL;
334
335         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
336         if (!peer_req) {
337                 if (!(gfp_mask & __GFP_NOWARN))
338                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
339                 return NULL;
340         }
341
342         page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
343         if (!page)
344                 goto fail;
345
346         drbd_clear_interval(&peer_req->i);
347         peer_req->i.size = data_size;
348         peer_req->i.sector = sector;
349         peer_req->i.local = false;
350         peer_req->i.waiting = false;
351
352         peer_req->epoch = NULL;
353         peer_req->w.mdev = mdev;
354         peer_req->pages = page;
355         atomic_set(&peer_req->pending_bios, 0);
356         peer_req->flags = 0;
357         /*
358          * The block_id is opaque to the receiver.  It is not endianness
359          * converted, and sent back to the sender unchanged.
360          */
361         peer_req->block_id = id;
362
363         return peer_req;
364
365  fail:
366         mempool_free(peer_req, drbd_ee_mempool);
367         return NULL;
368 }
369
370 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
371                        int is_net)
372 {
373         if (peer_req->flags & EE_HAS_DIGEST)
374                 kfree(peer_req->digest);
375         drbd_pp_free(mdev, peer_req->pages, is_net);
376         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
377         D_ASSERT(drbd_interval_empty(&peer_req->i));
378         mempool_free(peer_req, drbd_ee_mempool);
379 }
380
381 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
382 {
383         LIST_HEAD(work_list);
384         struct drbd_peer_request *peer_req, *t;
385         int count = 0;
386         int is_net = list == &mdev->net_ee;
387
388         spin_lock_irq(&mdev->tconn->req_lock);
389         list_splice_init(list, &work_list);
390         spin_unlock_irq(&mdev->tconn->req_lock);
391
392         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
393                 drbd_free_some_ee(mdev, peer_req, is_net);
394                 count++;
395         }
396         return count;
397 }
398
399
400 /* See also comments in _req_mod(,BARRIER_ACKED)
401  * and receive_Barrier.
402  *
403  * Move entries from net_ee to done_ee, if ready.
404  * Grab done_ee, call all callbacks, free the entries.
405  * The callbacks typically send out ACKs.
406  */
407 static int drbd_process_done_ee(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_net_ee(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_ee(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_ee(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
441 {
442         DEFINE_WAIT(wait);
443
444         /* avoids spin_lock/unlock
445          * and calling prepare_to_wait in the fast path */
446         while (!list_empty(head)) {
447                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
448                 spin_unlock_irq(&mdev->tconn->req_lock);
449                 io_schedule();
450                 finish_wait(&mdev->ee_wait, &wait);
451                 spin_lock_irq(&mdev->tconn->req_lock);
452         }
453 }
454
455 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
456 {
457         spin_lock_irq(&mdev->tconn->req_lock);
458         _drbd_wait_ee_list_empty(mdev, head);
459         spin_unlock_irq(&mdev->tconn->req_lock);
460 }
461
462 /* see also kernel_accept; which is only present since 2.6.18.
463  * also we want to log which part of it failed, exactly */
464 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
465 {
466         struct sock *sk = sock->sk;
467         int err = 0;
468
469         *what = "listen";
470         err = sock->ops->listen(sock, 5);
471         if (err < 0)
472                 goto out;
473
474         *what = "sock_create_lite";
475         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
476                                newsock);
477         if (err < 0)
478                 goto out;
479
480         *what = "accept";
481         err = sock->ops->accept(sock, *newsock, 0);
482         if (err < 0) {
483                 sock_release(*newsock);
484                 *newsock = NULL;
485                 goto out;
486         }
487         (*newsock)->ops  = sock->ops;
488
489 out:
490         return err;
491 }
492
493 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
494 {
495         mm_segment_t oldfs;
496         struct kvec iov = {
497                 .iov_base = buf,
498                 .iov_len = size,
499         };
500         struct msghdr msg = {
501                 .msg_iovlen = 1,
502                 .msg_iov = (struct iovec *)&iov,
503                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
504         };
505         int rv;
506
507         oldfs = get_fs();
508         set_fs(KERNEL_DS);
509         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
510         set_fs(oldfs);
511
512         return rv;
513 }
514
515 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
516 {
517         mm_segment_t oldfs;
518         struct kvec iov = {
519                 .iov_base = buf,
520                 .iov_len = size,
521         };
522         struct msghdr msg = {
523                 .msg_iovlen = 1,
524                 .msg_iov = (struct iovec *)&iov,
525                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
526         };
527         int rv;
528
529         oldfs = get_fs();
530         set_fs(KERNEL_DS);
531
532         for (;;) {
533                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
534                 if (rv == size)
535                         break;
536
537                 /* Note:
538                  * ECONNRESET   other side closed the connection
539                  * ERESTARTSYS  (on  sock) we got a signal
540                  */
541
542                 if (rv < 0) {
543                         if (rv == -ECONNRESET)
544                                 conn_info(tconn, "sock was reset by peer\n");
545                         else if (rv != -ERESTARTSYS)
546                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
547                         break;
548                 } else if (rv == 0) {
549                         conn_info(tconn, "sock was shut down by peer\n");
550                         break;
551                 } else  {
552                         /* signal came in, or peer/link went down,
553                          * after we read a partial message
554                          */
555                         /* D_ASSERT(signal_pending(current)); */
556                         break;
557                 }
558         };
559
560         set_fs(oldfs);
561
562         if (rv != size)
563                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
564
565         return rv;
566 }
567
568 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
569 {
570         int err;
571
572         err = drbd_recv(tconn, buf, size);
573         if (err != size) {
574                 if (err >= 0)
575                         err = -EIO;
576         } else
577                 err = 0;
578         return err;
579 }
580
581 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
582 {
583         int err;
584
585         err = drbd_recv_all(tconn, buf, size);
586         if (err && !signal_pending(current))
587                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
588         return err;
589 }
590
591 /* quoting tcp(7):
592  *   On individual connections, the socket buffer size must be set prior to the
593  *   listen(2) or connect(2) calls in order to have it take effect.
594  * This is our wrapper to do so.
595  */
596 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
597                 unsigned int rcv)
598 {
599         /* open coded SO_SNDBUF, SO_RCVBUF */
600         if (snd) {
601                 sock->sk->sk_sndbuf = snd;
602                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
603         }
604         if (rcv) {
605                 sock->sk->sk_rcvbuf = rcv;
606                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
607         }
608 }
609
610 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
611 {
612         const char *what;
613         struct socket *sock;
614         struct sockaddr_in6 src_in6;
615         int err;
616         int disconnect_on_error = 1;
617
618         if (!get_net_conf(tconn))
619                 return NULL;
620
621         what = "sock_create_kern";
622         err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
623                 SOCK_STREAM, IPPROTO_TCP, &sock);
624         if (err < 0) {
625                 sock = NULL;
626                 goto out;
627         }
628
629         sock->sk->sk_rcvtimeo =
630         sock->sk->sk_sndtimeo =  tconn->net_conf->try_connect_int*HZ;
631         drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
632                         tconn->net_conf->rcvbuf_size);
633
634        /* explicitly bind to the configured IP as source IP
635         *  for the outgoing connections.
636         *  This is needed for multihomed hosts and to be
637         *  able to use lo: interfaces for drbd.
638         * Make sure to use 0 as port number, so linux selects
639         *  a free one dynamically.
640         */
641         memcpy(&src_in6, tconn->net_conf->my_addr,
642                min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
643         if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
644                 src_in6.sin6_port = 0;
645         else
646                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
647
648         what = "bind before connect";
649         err = sock->ops->bind(sock,
650                               (struct sockaddr *) &src_in6,
651                               tconn->net_conf->my_addr_len);
652         if (err < 0)
653                 goto out;
654
655         /* connect may fail, peer not yet available.
656          * stay C_WF_CONNECTION, don't go Disconnecting! */
657         disconnect_on_error = 0;
658         what = "connect";
659         err = sock->ops->connect(sock,
660                                  (struct sockaddr *)tconn->net_conf->peer_addr,
661                                  tconn->net_conf->peer_addr_len, 0);
662
663 out:
664         if (err < 0) {
665                 if (sock) {
666                         sock_release(sock);
667                         sock = NULL;
668                 }
669                 switch (-err) {
670                         /* timeout, busy, signal pending */
671                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
672                 case EINTR: case ERESTARTSYS:
673                         /* peer not (yet) available, network problem */
674                 case ECONNREFUSED: case ENETUNREACH:
675                 case EHOSTDOWN:    case EHOSTUNREACH:
676                         disconnect_on_error = 0;
677                         break;
678                 default:
679                         conn_err(tconn, "%s failed, err = %d\n", what, err);
680                 }
681                 if (disconnect_on_error)
682                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
683         }
684         put_net_conf(tconn);
685         return sock;
686 }
687
688 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
689 {
690         int timeo, err;
691         struct socket *s_estab = NULL, *s_listen;
692         const char *what;
693
694         if (!get_net_conf(tconn))
695                 return NULL;
696
697         what = "sock_create_kern";
698         err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
699                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
700         if (err) {
701                 s_listen = NULL;
702                 goto out;
703         }
704
705         timeo = tconn->net_conf->try_connect_int * HZ;
706         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
707
708         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
709         s_listen->sk->sk_rcvtimeo = timeo;
710         s_listen->sk->sk_sndtimeo = timeo;
711         drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
712                         tconn->net_conf->rcvbuf_size);
713
714         what = "bind before listen";
715         err = s_listen->ops->bind(s_listen,
716                               (struct sockaddr *) tconn->net_conf->my_addr,
717                               tconn->net_conf->my_addr_len);
718         if (err < 0)
719                 goto out;
720
721         err = drbd_accept(&what, s_listen, &s_estab);
722
723 out:
724         if (s_listen)
725                 sock_release(s_listen);
726         if (err < 0) {
727                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
728                         conn_err(tconn, "%s failed, err = %d\n", what, err);
729                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
730                 }
731         }
732         put_net_conf(tconn);
733
734         return s_estab;
735 }
736
737 static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
738 {
739         struct p_header *h = tconn->data.sbuf;
740
741         return !_conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
742 }
743
744 static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
745 {
746         struct p_header80 *h = tconn->data.rbuf;
747         int rr;
748
749         rr = drbd_recv_short(sock, h, sizeof(*h), 0);
750
751         if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
752                 return be16_to_cpu(h->command);
753
754         return 0xffff;
755 }
756
757 /**
758  * drbd_socket_okay() - Free the socket if its connection is not okay
759  * @sock:       pointer to the pointer to the socket.
760  */
761 static int drbd_socket_okay(struct socket **sock)
762 {
763         int rr;
764         char tb[4];
765
766         if (!*sock)
767                 return false;
768
769         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
770
771         if (rr > 0 || rr == -EAGAIN) {
772                 return true;
773         } else {
774                 sock_release(*sock);
775                 *sock = NULL;
776                 return false;
777         }
778 }
779 /* Gets called if a connection is established, or if a new minor gets created
780    in a connection */
781 int drbd_connected(int vnr, void *p, void *data)
782 {
783         struct drbd_conf *mdev = (struct drbd_conf *)p;
784         int err;
785
786         atomic_set(&mdev->packet_seq, 0);
787         mdev->peer_seq = 0;
788
789         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
790                 &mdev->tconn->cstate_mutex :
791                 &mdev->own_state_mutex;
792
793         err = drbd_send_sync_param(mdev);
794         if (!err)
795                 err = drbd_send_sizes(mdev, 0, 0);
796         if (!err)
797                 err = drbd_send_uuids(mdev);
798         if (!err)
799                 err = drbd_send_state(mdev);
800         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
801         clear_bit(RESIZE_PENDING, &mdev->flags);
802         return err;
803 }
804
805 /*
806  * return values:
807  *   1 yes, we have a valid connection
808  *   0 oops, did not work out, please try again
809  *  -1 peer talks different language,
810  *     no point in trying again, please go standalone.
811  *  -2 We do not have a network config...
812  */
813 static int drbd_connect(struct drbd_tconn *tconn)
814 {
815         struct socket *s, *sock, *msock;
816         int try, h, ok;
817
818         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
819                 return -2;
820
821         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
822
823         /* Assume that the peer only understands protocol 80 until we know better.  */
824         tconn->agreed_pro_version = 80;
825
826         sock  = NULL;
827         msock = NULL;
828
829         do {
830                 for (try = 0;;) {
831                         /* 3 tries, this should take less than a second! */
832                         s = drbd_try_connect(tconn);
833                         if (s || ++try >= 3)
834                                 break;
835                         /* give the other side time to call bind() & listen() */
836                         schedule_timeout_interruptible(HZ / 10);
837                 }
838
839                 if (s) {
840                         if (!sock) {
841                                 drbd_send_fp(tconn, s, P_HAND_SHAKE_S);
842                                 sock = s;
843                                 s = NULL;
844                         } else if (!msock) {
845                                 drbd_send_fp(tconn, s, P_HAND_SHAKE_M);
846                                 msock = s;
847                                 s = NULL;
848                         } else {
849                                 conn_err(tconn, "Logic error in drbd_connect()\n");
850                                 goto out_release_sockets;
851                         }
852                 }
853
854                 if (sock && msock) {
855                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
856                         ok = drbd_socket_okay(&sock);
857                         ok = drbd_socket_okay(&msock) && ok;
858                         if (ok)
859                                 break;
860                 }
861
862 retry:
863                 s = drbd_wait_for_connect(tconn);
864                 if (s) {
865                         try = drbd_recv_fp(tconn, s);
866                         drbd_socket_okay(&sock);
867                         drbd_socket_okay(&msock);
868                         switch (try) {
869                         case P_HAND_SHAKE_S:
870                                 if (sock) {
871                                         conn_warn(tconn, "initial packet S crossed\n");
872                                         sock_release(sock);
873                                 }
874                                 sock = s;
875                                 break;
876                         case P_HAND_SHAKE_M:
877                                 if (msock) {
878                                         conn_warn(tconn, "initial packet M crossed\n");
879                                         sock_release(msock);
880                                 }
881                                 msock = s;
882                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
883                                 break;
884                         default:
885                                 conn_warn(tconn, "Error receiving initial packet\n");
886                                 sock_release(s);
887                                 if (random32() & 1)
888                                         goto retry;
889                         }
890                 }
891
892                 if (tconn->cstate <= C_DISCONNECTING)
893                         goto out_release_sockets;
894                 if (signal_pending(current)) {
895                         flush_signals(current);
896                         smp_rmb();
897                         if (get_t_state(&tconn->receiver) == EXITING)
898                                 goto out_release_sockets;
899                 }
900
901                 if (sock && msock) {
902                         ok = drbd_socket_okay(&sock);
903                         ok = drbd_socket_okay(&msock) && ok;
904                         if (ok)
905                                 break;
906                 }
907         } while (1);
908
909         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
910         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
911
912         sock->sk->sk_allocation = GFP_NOIO;
913         msock->sk->sk_allocation = GFP_NOIO;
914
915         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
916         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
917
918         /* NOT YET ...
919          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
920          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
921          * first set it to the P_HAND_SHAKE timeout,
922          * which we set to 4x the configured ping_timeout. */
923         sock->sk->sk_sndtimeo =
924         sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
925
926         msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
927         msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
928
929         /* we don't want delays.
930          * we use TCP_CORK where appropriate, though */
931         drbd_tcp_nodelay(sock);
932         drbd_tcp_nodelay(msock);
933
934         tconn->data.socket = sock;
935         tconn->meta.socket = msock;
936         tconn->last_received = jiffies;
937
938         h = drbd_do_handshake(tconn);
939         if (h <= 0)
940                 return h;
941
942         if (tconn->cram_hmac_tfm) {
943                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
944                 switch (drbd_do_auth(tconn)) {
945                 case -1:
946                         conn_err(tconn, "Authentication of peer failed\n");
947                         return -1;
948                 case 0:
949                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
950                         return 0;
951                 }
952         }
953
954         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
955                 return 0;
956
957         sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
958         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
959
960         drbd_thread_start(&tconn->asender);
961
962         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
963                 return -1;
964
965         return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
966
967 out_release_sockets:
968         if (sock)
969                 sock_release(sock);
970         if (msock)
971                 sock_release(msock);
972         return -1;
973 }
974
975 static int decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
976 {
977         if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
978                 pi->cmd = be16_to_cpu(h->h80.command);
979                 pi->size = be16_to_cpu(h->h80.length);
980                 pi->vnr = 0;
981         } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
982                 pi->cmd = be16_to_cpu(h->h95.command);
983                 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
984                 pi->vnr = 0;
985         } else {
986                 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
987                     be32_to_cpu(h->h80.magic),
988                     be16_to_cpu(h->h80.command),
989                     be16_to_cpu(h->h80.length));
990                 return -EINVAL;
991         }
992         return 0;
993 }
994
995 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
996 {
997         struct p_header *h = tconn->data.rbuf;
998         int err;
999
1000         err = drbd_recv_all_warn(tconn, h, sizeof(*h));
1001         if (err)
1002                 return err;
1003
1004         err = decode_header(tconn, h, pi);
1005         tconn->last_received = jiffies;
1006
1007         return err;
1008 }
1009
1010 static void drbd_flush(struct drbd_conf *mdev)
1011 {
1012         int rv;
1013
1014         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1015                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1016                                         NULL);
1017                 if (rv) {
1018                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
1019                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1020                          * don't try again for ANY return value != 0
1021                          * if (rv == -EOPNOTSUPP) */
1022                         drbd_bump_write_ordering(mdev, WO_drain_io);
1023                 }
1024                 put_ldev(mdev);
1025         }
1026 }
1027
1028 /**
1029  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1030  * @mdev:       DRBD device.
1031  * @epoch:      Epoch object.
1032  * @ev:         Epoch event.
1033  */
1034 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1035                                                struct drbd_epoch *epoch,
1036                                                enum epoch_event ev)
1037 {
1038         int epoch_size;
1039         struct drbd_epoch *next_epoch;
1040         enum finish_epoch rv = FE_STILL_LIVE;
1041
1042         spin_lock(&mdev->epoch_lock);
1043         do {
1044                 next_epoch = NULL;
1045
1046                 epoch_size = atomic_read(&epoch->epoch_size);
1047
1048                 switch (ev & ~EV_CLEANUP) {
1049                 case EV_PUT:
1050                         atomic_dec(&epoch->active);
1051                         break;
1052                 case EV_GOT_BARRIER_NR:
1053                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1054                         break;
1055                 case EV_BECAME_LAST:
1056                         /* nothing to do*/
1057                         break;
1058                 }
1059
1060                 if (epoch_size != 0 &&
1061                     atomic_read(&epoch->active) == 0 &&
1062                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1063                         if (!(ev & EV_CLEANUP)) {
1064                                 spin_unlock(&mdev->epoch_lock);
1065                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1066                                 spin_lock(&mdev->epoch_lock);
1067                         }
1068                         dec_unacked(mdev);
1069
1070                         if (mdev->current_epoch != epoch) {
1071                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1072                                 list_del(&epoch->list);
1073                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1074                                 mdev->epochs--;
1075                                 kfree(epoch);
1076
1077                                 if (rv == FE_STILL_LIVE)
1078                                         rv = FE_DESTROYED;
1079                         } else {
1080                                 epoch->flags = 0;
1081                                 atomic_set(&epoch->epoch_size, 0);
1082                                 /* atomic_set(&epoch->active, 0); is already zero */
1083                                 if (rv == FE_STILL_LIVE)
1084                                         rv = FE_RECYCLED;
1085                                 wake_up(&mdev->ee_wait);
1086                         }
1087                 }
1088
1089                 if (!next_epoch)
1090                         break;
1091
1092                 epoch = next_epoch;
1093         } while (1);
1094
1095         spin_unlock(&mdev->epoch_lock);
1096
1097         return rv;
1098 }
1099
1100 /**
1101  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1102  * @mdev:       DRBD device.
1103  * @wo:         Write ordering method to try.
1104  */
1105 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1106 {
1107         enum write_ordering_e pwo;
1108         static char *write_ordering_str[] = {
1109                 [WO_none] = "none",
1110                 [WO_drain_io] = "drain",
1111                 [WO_bdev_flush] = "flush",
1112         };
1113
1114         pwo = mdev->write_ordering;
1115         wo = min(pwo, wo);
1116         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1117                 wo = WO_drain_io;
1118         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1119                 wo = WO_none;
1120         mdev->write_ordering = wo;
1121         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1122                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1123 }
1124
1125 /**
1126  * drbd_submit_peer_request()
1127  * @mdev:       DRBD device.
1128  * @peer_req:   peer request
1129  * @rw:         flag field, see bio->bi_rw
1130  *
1131  * May spread the pages to multiple bios,
1132  * depending on bio_add_page restrictions.
1133  *
1134  * Returns 0 if all bios have been submitted,
1135  * -ENOMEM if we could not allocate enough bios,
1136  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1137  *  single page to an empty bio (which should never happen and likely indicates
1138  *  that the lower level IO stack is in some way broken). This has been observed
1139  *  on certain Xen deployments.
1140  */
1141 /* TODO allocate from our own bio_set. */
1142 int drbd_submit_peer_request(struct drbd_conf *mdev,
1143                              struct drbd_peer_request *peer_req,
1144                              const unsigned rw, const int fault_type)
1145 {
1146         struct bio *bios = NULL;
1147         struct bio *bio;
1148         struct page *page = peer_req->pages;
1149         sector_t sector = peer_req->i.sector;
1150         unsigned ds = peer_req->i.size;
1151         unsigned n_bios = 0;
1152         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1153         int err = -ENOMEM;
1154
1155         /* In most cases, we will only need one bio.  But in case the lower
1156          * level restrictions happen to be different at this offset on this
1157          * side than those of the sending peer, we may need to submit the
1158          * request in more than one bio.
1159          *
1160          * Plain bio_alloc is good enough here, this is no DRBD internally
1161          * generated bio, but a bio allocated on behalf of the peer.
1162          */
1163 next_bio:
1164         bio = bio_alloc(GFP_NOIO, nr_pages);
1165         if (!bio) {
1166                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1167                 goto fail;
1168         }
1169         /* > peer_req->i.sector, unless this is the first bio */
1170         bio->bi_sector = sector;
1171         bio->bi_bdev = mdev->ldev->backing_bdev;
1172         bio->bi_rw = rw;
1173         bio->bi_private = peer_req;
1174         bio->bi_end_io = drbd_peer_request_endio;
1175
1176         bio->bi_next = bios;
1177         bios = bio;
1178         ++n_bios;
1179
1180         page_chain_for_each(page) {
1181                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1182                 if (!bio_add_page(bio, page, len, 0)) {
1183                         /* A single page must always be possible!
1184                          * But in case it fails anyways,
1185                          * we deal with it, and complain (below). */
1186                         if (bio->bi_vcnt == 0) {
1187                                 dev_err(DEV,
1188                                         "bio_add_page failed for len=%u, "
1189                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1190                                         len, (unsigned long long)bio->bi_sector);
1191                                 err = -ENOSPC;
1192                                 goto fail;
1193                         }
1194                         goto next_bio;
1195                 }
1196                 ds -= len;
1197                 sector += len >> 9;
1198                 --nr_pages;
1199         }
1200         D_ASSERT(page == NULL);
1201         D_ASSERT(ds == 0);
1202
1203         atomic_set(&peer_req->pending_bios, n_bios);
1204         do {
1205                 bio = bios;
1206                 bios = bios->bi_next;
1207                 bio->bi_next = NULL;
1208
1209                 drbd_generic_make_request(mdev, fault_type, bio);
1210         } while (bios);
1211         return 0;
1212
1213 fail:
1214         while (bios) {
1215                 bio = bios;
1216                 bios = bios->bi_next;
1217                 bio_put(bio);
1218         }
1219         return err;
1220 }
1221
1222 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1223                                              struct drbd_peer_request *peer_req)
1224 {
1225         struct drbd_interval *i = &peer_req->i;
1226
1227         drbd_remove_interval(&mdev->write_requests, i);
1228         drbd_clear_interval(i);
1229
1230         /* Wake up any processes waiting for this peer request to complete.  */
1231         if (i->waiting)
1232                 wake_up(&mdev->misc_wait);
1233 }
1234
1235 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1236 {
1237         struct drbd_conf *mdev;
1238         int rv;
1239         struct p_barrier *p = tconn->data.rbuf;
1240         struct drbd_epoch *epoch;
1241
1242         mdev = vnr_to_mdev(tconn, pi->vnr);
1243         if (!mdev)
1244                 return -EIO;
1245
1246         inc_unacked(mdev);
1247
1248         mdev->current_epoch->barrier_nr = p->barrier;
1249         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1250
1251         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1252          * the activity log, which means it would not be resynced in case the
1253          * R_PRIMARY crashes now.
1254          * Therefore we must send the barrier_ack after the barrier request was
1255          * completed. */
1256         switch (mdev->write_ordering) {
1257         case WO_none:
1258                 if (rv == FE_RECYCLED)
1259                         return 0;
1260
1261                 /* receiver context, in the writeout path of the other node.
1262                  * avoid potential distributed deadlock */
1263                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1264                 if (epoch)
1265                         break;
1266                 else
1267                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1268                         /* Fall through */
1269
1270         case WO_bdev_flush:
1271         case WO_drain_io:
1272                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1273                 drbd_flush(mdev);
1274
1275                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1276                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1277                         if (epoch)
1278                                 break;
1279                 }
1280
1281                 epoch = mdev->current_epoch;
1282                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1283
1284                 D_ASSERT(atomic_read(&epoch->active) == 0);
1285                 D_ASSERT(epoch->flags == 0);
1286
1287                 return 0;
1288         default:
1289                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1290                 return -EIO;
1291         }
1292
1293         epoch->flags = 0;
1294         atomic_set(&epoch->epoch_size, 0);
1295         atomic_set(&epoch->active, 0);
1296
1297         spin_lock(&mdev->epoch_lock);
1298         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1299                 list_add(&epoch->list, &mdev->current_epoch->list);
1300                 mdev->current_epoch = epoch;
1301                 mdev->epochs++;
1302         } else {
1303                 /* The current_epoch got recycled while we allocated this one... */
1304                 kfree(epoch);
1305         }
1306         spin_unlock(&mdev->epoch_lock);
1307
1308         return 0;
1309 }
1310
1311 /* used from receive_RSDataReply (recv_resync_read)
1312  * and from receive_Data */
1313 static struct drbd_peer_request *
1314 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1315               int data_size) __must_hold(local)
1316 {
1317         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1318         struct drbd_peer_request *peer_req;
1319         struct page *page;
1320         int dgs, ds, err;
1321         void *dig_in = mdev->tconn->int_dig_in;
1322         void *dig_vv = mdev->tconn->int_dig_vv;
1323         unsigned long *data;
1324
1325         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1326                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1327
1328         if (dgs) {
1329                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1330                 if (err)
1331                         return NULL;
1332         }
1333
1334         data_size -= dgs;
1335
1336         if (!expect(data_size != 0))
1337                 return NULL;
1338         if (!expect(IS_ALIGNED(data_size, 512)))
1339                 return NULL;
1340         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1341                 return NULL;
1342
1343         /* even though we trust out peer,
1344          * we sometimes have to double check. */
1345         if (sector + (data_size>>9) > capacity) {
1346                 dev_err(DEV, "request from peer beyond end of local disk: "
1347                         "capacity: %llus < sector: %llus + size: %u\n",
1348                         (unsigned long long)capacity,
1349                         (unsigned long long)sector, data_size);
1350                 return NULL;
1351         }
1352
1353         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1354          * "criss-cross" setup, that might cause write-out on some other DRBD,
1355          * which in turn might block on the other node at this very place.  */
1356         peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1357         if (!peer_req)
1358                 return NULL;
1359
1360         ds = data_size;
1361         page = peer_req->pages;
1362         page_chain_for_each(page) {
1363                 unsigned len = min_t(int, ds, PAGE_SIZE);
1364                 data = kmap(page);
1365                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1366                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1367                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1368                         data[0] = data[0] ^ (unsigned long)-1;
1369                 }
1370                 kunmap(page);
1371                 if (err) {
1372                         drbd_free_ee(mdev, peer_req);
1373                         return NULL;
1374                 }
1375                 ds -= len;
1376         }
1377
1378         if (dgs) {
1379                 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1380                 if (memcmp(dig_in, dig_vv, dgs)) {
1381                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1382                                 (unsigned long long)sector, data_size);
1383                         drbd_free_ee(mdev, peer_req);
1384                         return NULL;
1385                 }
1386         }
1387         mdev->recv_cnt += data_size>>9;
1388         return peer_req;
1389 }
1390
1391 /* drbd_drain_block() just takes a data block
1392  * out of the socket input buffer, and discards it.
1393  */
1394 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1395 {
1396         struct page *page;
1397         int err = 0;
1398         void *data;
1399
1400         if (!data_size)
1401                 return 0;
1402
1403         page = drbd_pp_alloc(mdev, 1, 1);
1404
1405         data = kmap(page);
1406         while (data_size) {
1407                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1408
1409                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1410                 if (err)
1411                         break;
1412                 data_size -= len;
1413         }
1414         kunmap(page);
1415         drbd_pp_free(mdev, page, 0);
1416         return err;
1417 }
1418
1419 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1420                            sector_t sector, int data_size)
1421 {
1422         struct bio_vec *bvec;
1423         struct bio *bio;
1424         int dgs, err, i, expect;
1425         void *dig_in = mdev->tconn->int_dig_in;
1426         void *dig_vv = mdev->tconn->int_dig_vv;
1427
1428         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1429                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1430
1431         if (dgs) {
1432                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1433                 if (err)
1434                         return err;
1435         }
1436
1437         data_size -= dgs;
1438
1439         /* optimistically update recv_cnt.  if receiving fails below,
1440          * we disconnect anyways, and counters will be reset. */
1441         mdev->recv_cnt += data_size>>9;
1442
1443         bio = req->master_bio;
1444         D_ASSERT(sector == bio->bi_sector);
1445
1446         bio_for_each_segment(bvec, bio, i) {
1447                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1448                 expect = min_t(int, data_size, bvec->bv_len);
1449                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1450                 kunmap(bvec->bv_page);
1451                 if (err)
1452                         return err;
1453                 data_size -= expect;
1454         }
1455
1456         if (dgs) {
1457                 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1458                 if (memcmp(dig_in, dig_vv, dgs)) {
1459                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1460                         return -EINVAL;
1461                 }
1462         }
1463
1464         D_ASSERT(data_size == 0);
1465         return 0;
1466 }
1467
1468 /* e_end_resync_block() is called via
1469  * drbd_process_done_ee() by asender only */
1470 static int e_end_resync_block(struct drbd_work *w, int unused)
1471 {
1472         struct drbd_peer_request *peer_req =
1473                 container_of(w, struct drbd_peer_request, w);
1474         struct drbd_conf *mdev = w->mdev;
1475         sector_t sector = peer_req->i.sector;
1476         int err;
1477
1478         D_ASSERT(drbd_interval_empty(&peer_req->i));
1479
1480         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1481                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1482                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1483         } else {
1484                 /* Record failure to sync */
1485                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1486
1487                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1488         }
1489         dec_unacked(mdev);
1490
1491         return err;
1492 }
1493
1494 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1495 {
1496         struct drbd_peer_request *peer_req;
1497
1498         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1499         if (!peer_req)
1500                 goto fail;
1501
1502         dec_rs_pending(mdev);
1503
1504         inc_unacked(mdev);
1505         /* corresponding dec_unacked() in e_end_resync_block()
1506          * respective _drbd_clear_done_ee */
1507
1508         peer_req->w.cb = e_end_resync_block;
1509
1510         spin_lock_irq(&mdev->tconn->req_lock);
1511         list_add(&peer_req->w.list, &mdev->sync_ee);
1512         spin_unlock_irq(&mdev->tconn->req_lock);
1513
1514         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1515         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1516                 return 0;
1517
1518         /* don't care for the reason here */
1519         dev_err(DEV, "submit failed, triggering re-connect\n");
1520         spin_lock_irq(&mdev->tconn->req_lock);
1521         list_del(&peer_req->w.list);
1522         spin_unlock_irq(&mdev->tconn->req_lock);
1523
1524         drbd_free_ee(mdev, peer_req);
1525 fail:
1526         put_ldev(mdev);
1527         return -EIO;
1528 }
1529
1530 static struct drbd_request *
1531 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1532              sector_t sector, bool missing_ok, const char *func)
1533 {
1534         struct drbd_request *req;
1535
1536         /* Request object according to our peer */
1537         req = (struct drbd_request *)(unsigned long)id;
1538         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1539                 return req;
1540         if (!missing_ok) {
1541                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1542                         (unsigned long)id, (unsigned long long)sector);
1543         }
1544         return NULL;
1545 }
1546
1547 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1548 {
1549         struct drbd_conf *mdev;
1550         struct drbd_request *req;
1551         sector_t sector;
1552         int err;
1553         struct p_data *p = tconn->data.rbuf;
1554
1555         mdev = vnr_to_mdev(tconn, pi->vnr);
1556         if (!mdev)
1557                 return -EIO;
1558
1559         sector = be64_to_cpu(p->sector);
1560
1561         spin_lock_irq(&mdev->tconn->req_lock);
1562         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1563         spin_unlock_irq(&mdev->tconn->req_lock);
1564         if (unlikely(!req))
1565                 return -EIO;
1566
1567         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1568          * special casing it there for the various failure cases.
1569          * still no race with drbd_fail_pending_reads */
1570         err = recv_dless_read(mdev, req, sector, pi->size);
1571         if (!err)
1572                 req_mod(req, DATA_RECEIVED);
1573         /* else: nothing. handled from drbd_disconnect...
1574          * I don't think we may complete this just yet
1575          * in case we are "on-disconnect: freeze" */
1576
1577         return err;
1578 }
1579
1580 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1581 {
1582         struct drbd_conf *mdev;
1583         sector_t sector;
1584         int err;
1585         struct p_data *p = tconn->data.rbuf;
1586
1587         mdev = vnr_to_mdev(tconn, pi->vnr);
1588         if (!mdev)
1589                 return -EIO;
1590
1591         sector = be64_to_cpu(p->sector);
1592         D_ASSERT(p->block_id == ID_SYNCER);
1593
1594         if (get_ldev(mdev)) {
1595                 /* data is submitted to disk within recv_resync_read.
1596                  * corresponding put_ldev done below on error,
1597                  * or in drbd_peer_request_endio. */
1598                 err = recv_resync_read(mdev, sector, pi->size);
1599         } else {
1600                 if (__ratelimit(&drbd_ratelimit_state))
1601                         dev_err(DEV, "Can not write resync data to local disk.\n");
1602
1603                 err = drbd_drain_block(mdev, pi->size);
1604
1605                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1606         }
1607
1608         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1609
1610         return err;
1611 }
1612
1613 static int w_restart_write(struct drbd_work *w, int cancel)
1614 {
1615         struct drbd_request *req = container_of(w, struct drbd_request, w);
1616         struct drbd_conf *mdev = w->mdev;
1617         struct bio *bio;
1618         unsigned long start_time;
1619         unsigned long flags;
1620
1621         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1622         if (!expect(req->rq_state & RQ_POSTPONED)) {
1623                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1624                 return -EIO;
1625         }
1626         bio = req->master_bio;
1627         start_time = req->start_time;
1628         /* Postponed requests will not have their master_bio completed!  */
1629         __req_mod(req, DISCARD_WRITE, NULL);
1630         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1631
1632         while (__drbd_make_request(mdev, bio, start_time))
1633                 /* retry */ ;
1634         return 0;
1635 }
1636
1637 static void restart_conflicting_writes(struct drbd_conf *mdev,
1638                                        sector_t sector, int size)
1639 {
1640         struct drbd_interval *i;
1641         struct drbd_request *req;
1642
1643         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1644                 if (!i->local)
1645                         continue;
1646                 req = container_of(i, struct drbd_request, i);
1647                 if (req->rq_state & RQ_LOCAL_PENDING ||
1648                     !(req->rq_state & RQ_POSTPONED))
1649                         continue;
1650                 if (expect(list_empty(&req->w.list))) {
1651                         req->w.mdev = mdev;
1652                         req->w.cb = w_restart_write;
1653                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1654                 }
1655         }
1656 }
1657
1658 /* e_end_block() is called via drbd_process_done_ee().
1659  * this means this function only runs in the asender thread
1660  */
1661 static int e_end_block(struct drbd_work *w, int cancel)
1662 {
1663         struct drbd_peer_request *peer_req =
1664                 container_of(w, struct drbd_peer_request, w);
1665         struct drbd_conf *mdev = w->mdev;
1666         sector_t sector = peer_req->i.sector;
1667         int err = 0, pcmd;
1668
1669         if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
1670                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1671                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1672                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1673                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1674                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1675                         err = drbd_send_ack(mdev, pcmd, peer_req);
1676                         if (pcmd == P_RS_WRITE_ACK)
1677                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1678                 } else {
1679                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1680                         /* we expect it to be marked out of sync anyways...
1681                          * maybe assert this?  */
1682                 }
1683                 dec_unacked(mdev);
1684         }
1685         /* we delete from the conflict detection hash _after_ we sent out the
1686          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1687         if (mdev->tconn->net_conf->two_primaries) {
1688                 spin_lock_irq(&mdev->tconn->req_lock);
1689                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1690                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1691                 if (peer_req->flags & EE_RESTART_REQUESTS)
1692                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1693                 spin_unlock_irq(&mdev->tconn->req_lock);
1694         } else
1695                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1696
1697         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1698
1699         return err;
1700 }
1701
1702 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1703 {
1704         struct drbd_conf *mdev = w->mdev;
1705         struct drbd_peer_request *peer_req =
1706                 container_of(w, struct drbd_peer_request, w);
1707         int err;
1708
1709         err = drbd_send_ack(mdev, ack, peer_req);
1710         dec_unacked(mdev);
1711
1712         return err;
1713 }
1714
1715 static int e_send_discard_write(struct drbd_work *w, int unused)
1716 {
1717         return e_send_ack(w, P_DISCARD_WRITE);
1718 }
1719
1720 static int e_send_retry_write(struct drbd_work *w, int unused)
1721 {
1722         struct drbd_tconn *tconn = w->mdev->tconn;
1723
1724         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1725                              P_RETRY_WRITE : P_DISCARD_WRITE);
1726 }
1727
1728 static bool seq_greater(u32 a, u32 b)
1729 {
1730         /*
1731          * We assume 32-bit wrap-around here.
1732          * For 24-bit wrap-around, we would have to shift:
1733          *  a <<= 8; b <<= 8;
1734          */
1735         return (s32)a - (s32)b > 0;
1736 }
1737
1738 static u32 seq_max(u32 a, u32 b)
1739 {
1740         return seq_greater(a, b) ? a : b;
1741 }
1742
1743 static bool need_peer_seq(struct drbd_conf *mdev)
1744 {
1745         struct drbd_tconn *tconn = mdev->tconn;
1746
1747         /*
1748          * We only need to keep track of the last packet_seq number of our peer
1749          * if we are in dual-primary mode and we have the discard flag set; see
1750          * handle_write_conflicts().
1751          */
1752         return tconn->net_conf->two_primaries &&
1753                test_bit(DISCARD_CONCURRENT, &tconn->flags);
1754 }
1755
1756 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1757 {
1758         unsigned int newest_peer_seq;
1759
1760         if (need_peer_seq(mdev)) {
1761                 spin_lock(&mdev->peer_seq_lock);
1762                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1763                 mdev->peer_seq = newest_peer_seq;
1764                 spin_unlock(&mdev->peer_seq_lock);
1765                 /* wake up only if we actually changed mdev->peer_seq */
1766                 if (peer_seq == newest_peer_seq)
1767                         wake_up(&mdev->seq_wait);
1768         }
1769 }
1770
1771 /* Called from receive_Data.
1772  * Synchronize packets on sock with packets on msock.
1773  *
1774  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1775  * packet traveling on msock, they are still processed in the order they have
1776  * been sent.
1777  *
1778  * Note: we don't care for Ack packets overtaking P_DATA packets.
1779  *
1780  * In case packet_seq is larger than mdev->peer_seq number, there are
1781  * outstanding packets on the msock. We wait for them to arrive.
1782  * In case we are the logically next packet, we update mdev->peer_seq
1783  * ourselves. Correctly handles 32bit wrap around.
1784  *
1785  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1786  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1787  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1788  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1789  *
1790  * returns 0 if we may process the packet,
1791  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1792 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1793 {
1794         DEFINE_WAIT(wait);
1795         long timeout;
1796         int ret;
1797
1798         if (!need_peer_seq(mdev))
1799                 return 0;
1800
1801         spin_lock(&mdev->peer_seq_lock);
1802         for (;;) {
1803                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1804                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1805                         ret = 0;
1806                         break;
1807                 }
1808                 if (signal_pending(current)) {
1809                         ret = -ERESTARTSYS;
1810                         break;
1811                 }
1812                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1813                 spin_unlock(&mdev->peer_seq_lock);
1814                 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1815                 timeout = schedule_timeout(timeout);
1816                 spin_lock(&mdev->peer_seq_lock);
1817                 if (!timeout) {
1818                         ret = -ETIMEDOUT;
1819                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1820                         break;
1821                 }
1822         }
1823         spin_unlock(&mdev->peer_seq_lock);
1824         finish_wait(&mdev->seq_wait, &wait);
1825         return ret;
1826 }
1827
1828 /* see also bio_flags_to_wire()
1829  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1830  * flags and back. We may replicate to other kernel versions. */
1831 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1832 {
1833         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1834                 (dpf & DP_FUA ? REQ_FUA : 0) |
1835                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1836                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1837 }
1838
1839 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1840                                     unsigned int size)
1841 {
1842         struct drbd_interval *i;
1843
1844     repeat:
1845         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1846                 struct drbd_request *req;
1847                 struct bio_and_error m;
1848
1849                 if (!i->local)
1850                         continue;
1851                 req = container_of(i, struct drbd_request, i);
1852                 if (!(req->rq_state & RQ_POSTPONED))
1853                         continue;
1854                 req->rq_state &= ~RQ_POSTPONED;
1855                 __req_mod(req, NEG_ACKED, &m);
1856                 spin_unlock_irq(&mdev->tconn->req_lock);
1857                 if (m.bio)
1858                         complete_master_bio(mdev, &m);
1859                 spin_lock_irq(&mdev->tconn->req_lock);
1860                 goto repeat;
1861         }
1862 }
1863
1864 static int handle_write_conflicts(struct drbd_conf *mdev,
1865                                   struct drbd_peer_request *peer_req)
1866 {
1867         struct drbd_tconn *tconn = mdev->tconn;
1868         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1869         sector_t sector = peer_req->i.sector;
1870         const unsigned int size = peer_req->i.size;
1871         struct drbd_interval *i;
1872         bool equal;
1873         int err;
1874
1875         /*
1876          * Inserting the peer request into the write_requests tree will prevent
1877          * new conflicting local requests from being added.
1878          */
1879         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1880
1881     repeat:
1882         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1883                 if (i == &peer_req->i)
1884                         continue;
1885
1886                 if (!i->local) {
1887                         /*
1888                          * Our peer has sent a conflicting remote request; this
1889                          * should not happen in a two-node setup.  Wait for the
1890                          * earlier peer request to complete.
1891                          */
1892                         err = drbd_wait_misc(mdev, i);
1893                         if (err)
1894                                 goto out;
1895                         goto repeat;
1896                 }
1897
1898                 equal = i->sector == sector && i->size == size;
1899                 if (resolve_conflicts) {
1900                         /*
1901                          * If the peer request is fully contained within the
1902                          * overlapping request, it can be discarded; otherwise,
1903                          * it will be retried once all overlapping requests
1904                          * have completed.
1905                          */
1906                         bool discard = i->sector <= sector && i->sector +
1907                                        (i->size >> 9) >= sector + (size >> 9);
1908
1909                         if (!equal)
1910                                 dev_alert(DEV, "Concurrent writes detected: "
1911                                                "local=%llus +%u, remote=%llus +%u, "
1912                                                "assuming %s came first\n",
1913                                           (unsigned long long)i->sector, i->size,
1914                                           (unsigned long long)sector, size,
1915                                           discard ? "local" : "remote");
1916
1917                         inc_unacked(mdev);
1918                         peer_req->w.cb = discard ? e_send_discard_write :
1919                                                    e_send_retry_write;
1920                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
1921                         wake_asender(mdev->tconn);
1922
1923                         err = -ENOENT;
1924                         goto out;
1925                 } else {
1926                         struct drbd_request *req =
1927                                 container_of(i, struct drbd_request, i);
1928
1929                         if (!equal)
1930                                 dev_alert(DEV, "Concurrent writes detected: "
1931                                                "local=%llus +%u, remote=%llus +%u\n",
1932                                           (unsigned long long)i->sector, i->size,
1933                                           (unsigned long long)sector, size);
1934
1935                         if (req->rq_state & RQ_LOCAL_PENDING ||
1936                             !(req->rq_state & RQ_POSTPONED)) {
1937                                 /*
1938                                  * Wait for the node with the discard flag to
1939                                  * decide if this request will be discarded or
1940                                  * retried.  Requests that are discarded will
1941                                  * disappear from the write_requests tree.
1942                                  *
1943                                  * In addition, wait for the conflicting
1944                                  * request to finish locally before submitting
1945                                  * the conflicting peer request.
1946                                  */
1947                                 err = drbd_wait_misc(mdev, &req->i);
1948                                 if (err) {
1949                                         _conn_request_state(mdev->tconn,
1950                                                             NS(conn, C_TIMEOUT),
1951                                                             CS_HARD);
1952                                         fail_postponed_requests(mdev, sector, size);
1953                                         goto out;
1954                                 }
1955                                 goto repeat;
1956                         }
1957                         /*
1958                          * Remember to restart the conflicting requests after
1959                          * the new peer request has completed.
1960                          */
1961                         peer_req->flags |= EE_RESTART_REQUESTS;
1962                 }
1963         }
1964         err = 0;
1965
1966     out:
1967         if (err)
1968                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1969         return err;
1970 }
1971
1972 /* mirrored write */
1973 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
1974 {
1975         struct drbd_conf *mdev;
1976         sector_t sector;
1977         struct drbd_peer_request *peer_req;
1978         struct p_data *p = tconn->data.rbuf;
1979         u32 peer_seq = be32_to_cpu(p->seq_num);
1980         int rw = WRITE;
1981         u32 dp_flags;
1982         int err;
1983
1984         mdev = vnr_to_mdev(tconn, pi->vnr);
1985         if (!mdev)
1986                 return -EIO;
1987
1988         if (!get_ldev(mdev)) {
1989                 int err2;
1990
1991                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
1992                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1993                 atomic_inc(&mdev->current_epoch->epoch_size);
1994                 err2 = drbd_drain_block(mdev, pi->size);
1995                 if (!err)
1996                         err = err2;
1997                 return err;
1998         }
1999
2000         /*
2001          * Corresponding put_ldev done either below (on various errors), or in
2002          * drbd_peer_request_endio, if we successfully submit the data at the
2003          * end of this function.
2004          */
2005
2006         sector = be64_to_cpu(p->sector);
2007         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2008         if (!peer_req) {
2009                 put_ldev(mdev);
2010                 return -EIO;
2011         }
2012
2013         peer_req->w.cb = e_end_block;
2014
2015         dp_flags = be32_to_cpu(p->dp_flags);
2016         rw |= wire_flags_to_bio(mdev, dp_flags);
2017
2018         if (dp_flags & DP_MAY_SET_IN_SYNC)
2019                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2020
2021         spin_lock(&mdev->epoch_lock);
2022         peer_req->epoch = mdev->current_epoch;
2023         atomic_inc(&peer_req->epoch->epoch_size);
2024         atomic_inc(&peer_req->epoch->active);
2025         spin_unlock(&mdev->epoch_lock);
2026
2027         if (mdev->tconn->net_conf->two_primaries) {
2028                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2029                 if (err)
2030                         goto out_interrupted;
2031                 spin_lock_irq(&mdev->tconn->req_lock);
2032                 err = handle_write_conflicts(mdev, peer_req);
2033                 if (err) {
2034                         spin_unlock_irq(&mdev->tconn->req_lock);
2035                         if (err == -ENOENT) {
2036                                 put_ldev(mdev);
2037                                 return 0;
2038                         }
2039                         goto out_interrupted;
2040                 }
2041         } else
2042                 spin_lock_irq(&mdev->tconn->req_lock);
2043         list_add(&peer_req->w.list, &mdev->active_ee);
2044         spin_unlock_irq(&mdev->tconn->req_lock);
2045
2046         switch (mdev->tconn->net_conf->wire_protocol) {
2047         case DRBD_PROT_C:
2048                 inc_unacked(mdev);
2049                 /* corresponding dec_unacked() in e_end_block()
2050                  * respective _drbd_clear_done_ee */
2051                 break;
2052         case DRBD_PROT_B:
2053                 /* I really don't like it that the receiver thread
2054                  * sends on the msock, but anyways */
2055                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2056                 break;
2057         case DRBD_PROT_A:
2058                 /* nothing to do */
2059                 break;
2060         }
2061
2062         if (mdev->state.pdsk < D_INCONSISTENT) {
2063                 /* In case we have the only disk of the cluster, */
2064                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2065                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2066                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2067                 drbd_al_begin_io(mdev, peer_req->i.sector);
2068         }
2069
2070         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2071         if (!err)
2072                 return 0;
2073
2074         /* don't care for the reason here */
2075         dev_err(DEV, "submit failed, triggering re-connect\n");
2076         spin_lock_irq(&mdev->tconn->req_lock);
2077         list_del(&peer_req->w.list);
2078         drbd_remove_epoch_entry_interval(mdev, peer_req);
2079         spin_unlock_irq(&mdev->tconn->req_lock);
2080         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2081                 drbd_al_complete_io(mdev, peer_req->i.sector);
2082
2083 out_interrupted:
2084         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2085         put_ldev(mdev);
2086         drbd_free_ee(mdev, peer_req);
2087         return err;
2088 }
2089
2090 /* We may throttle resync, if the lower device seems to be busy,
2091  * and current sync rate is above c_min_rate.
2092  *
2093  * To decide whether or not the lower device is busy, we use a scheme similar
2094  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2095  * (more than 64 sectors) of activity we cannot account for with our own resync
2096  * activity, it obviously is "busy".
2097  *
2098  * The current sync rate used here uses only the most recent two step marks,
2099  * to have a short time average so we can react faster.
2100  */
2101 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2102 {
2103         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2104         unsigned long db, dt, dbdt;
2105         struct lc_element *tmp;
2106         int curr_events;
2107         int throttle = 0;
2108
2109         /* feature disabled? */
2110         if (mdev->ldev->dc.c_min_rate == 0)
2111                 return 0;
2112
2113         spin_lock_irq(&mdev->al_lock);
2114         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2115         if (tmp) {
2116                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2117                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2118                         spin_unlock_irq(&mdev->al_lock);
2119                         return 0;
2120                 }
2121                 /* Do not slow down if app IO is already waiting for this extent */
2122         }
2123         spin_unlock_irq(&mdev->al_lock);
2124
2125         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2126                       (int)part_stat_read(&disk->part0, sectors[1]) -
2127                         atomic_read(&mdev->rs_sect_ev);
2128
2129         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2130                 unsigned long rs_left;
2131                 int i;
2132
2133                 mdev->rs_last_events = curr_events;
2134
2135                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2136                  * approx. */
2137                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2138
2139                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2140                         rs_left = mdev->ov_left;
2141                 else
2142                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2143
2144                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2145                 if (!dt)
2146                         dt++;
2147                 db = mdev->rs_mark_left[i] - rs_left;
2148                 dbdt = Bit2KB(db/dt);
2149
2150                 if (dbdt > mdev->ldev->dc.c_min_rate)
2151                         throttle = 1;
2152         }
2153         return throttle;
2154 }
2155
2156
2157 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2158 {
2159         struct drbd_conf *mdev;
2160         sector_t sector;
2161         sector_t capacity;
2162         struct drbd_peer_request *peer_req;
2163         struct digest_info *di = NULL;
2164         int size, verb;
2165         unsigned int fault_type;
2166         struct p_block_req *p = tconn->data.rbuf;
2167
2168         mdev = vnr_to_mdev(tconn, pi->vnr);
2169         if (!mdev)
2170                 return -EIO;
2171         capacity = drbd_get_capacity(mdev->this_bdev);
2172
2173         sector = be64_to_cpu(p->sector);
2174         size   = be32_to_cpu(p->blksize);
2175
2176         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2177                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2178                                 (unsigned long long)sector, size);
2179                 return -EINVAL;
2180         }
2181         if (sector + (size>>9) > capacity) {
2182                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2183                                 (unsigned long long)sector, size);
2184                 return -EINVAL;
2185         }
2186
2187         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2188                 verb = 1;
2189                 switch (pi->cmd) {
2190                 case P_DATA_REQUEST:
2191                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2192                         break;
2193                 case P_RS_DATA_REQUEST:
2194                 case P_CSUM_RS_REQUEST:
2195                 case P_OV_REQUEST:
2196                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2197                         break;
2198                 case P_OV_REPLY:
2199                         verb = 0;
2200                         dec_rs_pending(mdev);
2201                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2202                         break;
2203                 default:
2204                         BUG();
2205                 }
2206                 if (verb && __ratelimit(&drbd_ratelimit_state))
2207                         dev_err(DEV, "Can not satisfy peer's read request, "
2208                             "no local data.\n");
2209
2210                 /* drain possibly payload */
2211                 return drbd_drain_block(mdev, pi->size);
2212         }
2213
2214         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2215          * "criss-cross" setup, that might cause write-out on some other DRBD,
2216          * which in turn might block on the other node at this very place.  */
2217         peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2218         if (!peer_req) {
2219                 put_ldev(mdev);
2220                 return -ENOMEM;
2221         }
2222
2223         switch (pi->cmd) {
2224         case P_DATA_REQUEST:
2225                 peer_req->w.cb = w_e_end_data_req;
2226                 fault_type = DRBD_FAULT_DT_RD;
2227                 /* application IO, don't drbd_rs_begin_io */
2228                 goto submit;
2229
2230         case P_RS_DATA_REQUEST:
2231                 peer_req->w.cb = w_e_end_rsdata_req;
2232                 fault_type = DRBD_FAULT_RS_RD;
2233                 /* used in the sector offset progress display */
2234                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2235                 break;
2236
2237         case P_OV_REPLY:
2238         case P_CSUM_RS_REQUEST:
2239                 fault_type = DRBD_FAULT_RS_RD;
2240                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2241                 if (!di)
2242                         goto out_free_e;
2243
2244                 di->digest_size = pi->size;
2245                 di->digest = (((char *)di)+sizeof(struct digest_info));
2246
2247                 peer_req->digest = di;
2248                 peer_req->flags |= EE_HAS_DIGEST;
2249
2250                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2251                         goto out_free_e;
2252
2253                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2254                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2255                         peer_req->w.cb = w_e_end_csum_rs_req;
2256                         /* used in the sector offset progress display */
2257                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2258                 } else if (pi->cmd == P_OV_REPLY) {
2259                         /* track progress, we may need to throttle */
2260                         atomic_add(size >> 9, &mdev->rs_sect_in);
2261                         peer_req->w.cb = w_e_end_ov_reply;
2262                         dec_rs_pending(mdev);
2263                         /* drbd_rs_begin_io done when we sent this request,
2264                          * but accounting still needs to be done. */
2265                         goto submit_for_resync;
2266                 }
2267                 break;
2268
2269         case P_OV_REQUEST:
2270                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2271                     mdev->tconn->agreed_pro_version >= 90) {
2272                         unsigned long now = jiffies;
2273                         int i;
2274                         mdev->ov_start_sector = sector;
2275                         mdev->ov_position = sector;
2276                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2277                         mdev->rs_total = mdev->ov_left;
2278                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2279                                 mdev->rs_mark_left[i] = mdev->ov_left;
2280                                 mdev->rs_mark_time[i] = now;
2281                         }
2282                         dev_info(DEV, "Online Verify start sector: %llu\n",
2283                                         (unsigned long long)sector);
2284                 }
2285                 peer_req->w.cb = w_e_end_ov_req;
2286                 fault_type = DRBD_FAULT_RS_RD;
2287                 break;
2288
2289         default:
2290                 BUG();
2291         }
2292
2293         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2294          * wrt the receiver, but it is not as straightforward as it may seem.
2295          * Various places in the resync start and stop logic assume resync
2296          * requests are processed in order, requeuing this on the worker thread
2297          * introduces a bunch of new code for synchronization between threads.
2298          *
2299          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2300          * "forever", throttling after drbd_rs_begin_io will lock that extent
2301          * for application writes for the same time.  For now, just throttle
2302          * here, where the rest of the code expects the receiver to sleep for
2303          * a while, anyways.
2304          */
2305
2306         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2307          * this defers syncer requests for some time, before letting at least
2308          * on request through.  The resync controller on the receiving side
2309          * will adapt to the incoming rate accordingly.
2310          *
2311          * We cannot throttle here if remote is Primary/SyncTarget:
2312          * we would also throttle its application reads.
2313          * In that case, throttling is done on the SyncTarget only.
2314          */
2315         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2316                 schedule_timeout_uninterruptible(HZ/10);
2317         if (drbd_rs_begin_io(mdev, sector))
2318                 goto out_free_e;
2319
2320 submit_for_resync:
2321         atomic_add(size >> 9, &mdev->rs_sect_ev);
2322
2323 submit:
2324         inc_unacked(mdev);
2325         spin_lock_irq(&mdev->tconn->req_lock);
2326         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2327         spin_unlock_irq(&mdev->tconn->req_lock);
2328
2329         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2330                 return 0;
2331
2332         /* don't care for the reason here */
2333         dev_err(DEV, "submit failed, triggering re-connect\n");
2334         spin_lock_irq(&mdev->tconn->req_lock);
2335         list_del(&peer_req->w.list);
2336         spin_unlock_irq(&mdev->tconn->req_lock);
2337         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2338
2339 out_free_e:
2340         put_ldev(mdev);
2341         drbd_free_ee(mdev, peer_req);
2342         return -EIO;
2343 }
2344
2345 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2346 {
2347         int self, peer, rv = -100;
2348         unsigned long ch_self, ch_peer;
2349
2350         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2351         peer = mdev->p_uuid[UI_BITMAP] & 1;
2352
2353         ch_peer = mdev->p_uuid[UI_SIZE];
2354         ch_self = mdev->comm_bm_set;
2355
2356         switch (mdev->tconn->net_conf->after_sb_0p) {
2357         case ASB_CONSENSUS:
2358         case ASB_DISCARD_SECONDARY:
2359         case ASB_CALL_HELPER:
2360                 dev_err(DEV, "Configuration error.\n");
2361                 break;
2362         case ASB_DISCONNECT:
2363                 break;
2364         case ASB_DISCARD_YOUNGER_PRI:
2365                 if (self == 0 && peer == 1) {
2366                         rv = -1;
2367                         break;
2368                 }
2369                 if (self == 1 && peer == 0) {
2370                         rv =  1;
2371                         break;
2372                 }
2373                 /* Else fall through to one of the other strategies... */
2374         case ASB_DISCARD_OLDER_PRI:
2375                 if (self == 0 && peer == 1) {
2376                         rv = 1;
2377                         break;
2378                 }
2379                 if (self == 1 && peer == 0) {
2380                         rv = -1;
2381                         break;
2382                 }
2383                 /* Else fall through to one of the other strategies... */
2384                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2385                      "Using discard-least-changes instead\n");
2386         case ASB_DISCARD_ZERO_CHG:
2387                 if (ch_peer == 0 && ch_self == 0) {
2388                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2389                                 ? -1 : 1;
2390                         break;
2391                 } else {
2392                         if (ch_peer == 0) { rv =  1; break; }
2393                         if (ch_self == 0) { rv = -1; break; }
2394                 }
2395                 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2396                         break;
2397         case ASB_DISCARD_LEAST_CHG:
2398                 if      (ch_self < ch_peer)
2399                         rv = -1;
2400                 else if (ch_self > ch_peer)
2401                         rv =  1;
2402                 else /* ( ch_self == ch_peer ) */
2403                      /* Well, then use something else. */
2404                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2405                                 ? -1 : 1;
2406                 break;
2407         case ASB_DISCARD_LOCAL:
2408                 rv = -1;
2409                 break;
2410         case ASB_DISCARD_REMOTE:
2411                 rv =  1;
2412         }
2413
2414         return rv;
2415 }
2416
2417 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2418 {
2419         int hg, rv = -100;
2420
2421         switch (mdev->tconn->net_conf->after_sb_1p) {
2422         case ASB_DISCARD_YOUNGER_PRI:
2423         case ASB_DISCARD_OLDER_PRI:
2424         case ASB_DISCARD_LEAST_CHG:
2425         case ASB_DISCARD_LOCAL:
2426         case ASB_DISCARD_REMOTE:
2427                 dev_err(DEV, "Configuration error.\n");
2428                 break;
2429         case ASB_DISCONNECT:
2430                 break;
2431         case ASB_CONSENSUS:
2432                 hg = drbd_asb_recover_0p(mdev);
2433                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2434                         rv = hg;
2435                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2436                         rv = hg;
2437                 break;
2438         case ASB_VIOLENTLY:
2439                 rv = drbd_asb_recover_0p(mdev);
2440                 break;
2441         case ASB_DISCARD_SECONDARY:
2442                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2443         case ASB_CALL_HELPER:
2444                 hg = drbd_asb_recover_0p(mdev);
2445                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2446                         enum drbd_state_rv rv2;
2447
2448                         drbd_set_role(mdev, R_SECONDARY, 0);
2449                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2450                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2451                           * we do not need to wait for the after state change work either. */
2452                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2453                         if (rv2 != SS_SUCCESS) {
2454                                 drbd_khelper(mdev, "pri-lost-after-sb");
2455                         } else {
2456                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2457                                 rv = hg;
2458                         }
2459                 } else
2460                         rv = hg;
2461         }
2462
2463         return rv;
2464 }
2465
2466 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2467 {
2468         int hg, rv = -100;
2469
2470         switch (mdev->tconn->net_conf->after_sb_2p) {
2471         case ASB_DISCARD_YOUNGER_PRI:
2472         case ASB_DISCARD_OLDER_PRI:
2473         case ASB_DISCARD_LEAST_CHG:
2474         case ASB_DISCARD_LOCAL:
2475         case ASB_DISCARD_REMOTE:
2476         case ASB_CONSENSUS:
2477         case ASB_DISCARD_SECONDARY:
2478                 dev_err(DEV, "Configuration error.\n");
2479                 break;
2480         case ASB_VIOLENTLY:
2481                 rv = drbd_asb_recover_0p(mdev);
2482                 break;
2483         case ASB_DISCONNECT:
2484                 break;
2485         case ASB_CALL_HELPER:
2486                 hg = drbd_asb_recover_0p(mdev);
2487                 if (hg == -1) {
2488                         enum drbd_state_rv rv2;
2489
2490                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2491                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2492                           * we do not need to wait for the after state change work either. */
2493                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2494                         if (rv2 != SS_SUCCESS) {
2495                                 drbd_khelper(mdev, "pri-lost-after-sb");
2496                         } else {
2497                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2498                                 rv = hg;
2499                         }
2500                 } else
2501                         rv = hg;
2502         }
2503
2504         return rv;
2505 }
2506
2507 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2508                            u64 bits, u64 flags)
2509 {
2510         if (!uuid) {
2511                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2512                 return;
2513         }
2514         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2515              text,
2516              (unsigned long long)uuid[UI_CURRENT],
2517              (unsigned long long)uuid[UI_BITMAP],
2518              (unsigned long long)uuid[UI_HISTORY_START],
2519              (unsigned long long)uuid[UI_HISTORY_END],
2520              (unsigned long long)bits,
2521              (unsigned long long)flags);
2522 }
2523
2524 /*
2525   100   after split brain try auto recover
2526     2   C_SYNC_SOURCE set BitMap
2527     1   C_SYNC_SOURCE use BitMap
2528     0   no Sync
2529    -1   C_SYNC_TARGET use BitMap
2530    -2   C_SYNC_TARGET set BitMap
2531  -100   after split brain, disconnect
2532 -1000   unrelated data
2533 -1091   requires proto 91
2534 -1096   requires proto 96
2535  */
2536 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2537 {
2538         u64 self, peer;
2539         int i, j;
2540
2541         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2542         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2543
2544         *rule_nr = 10;
2545         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2546                 return 0;
2547
2548         *rule_nr = 20;
2549         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2550              peer != UUID_JUST_CREATED)
2551                 return -2;
2552
2553         *rule_nr = 30;
2554         if (self != UUID_JUST_CREATED &&
2555             (peer == UUID_JUST_CREATED || peer == (u64)0))
2556                 return 2;
2557
2558         if (self == peer) {
2559                 int rct, dc; /* roles at crash time */
2560
2561                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2562
2563                         if (mdev->tconn->agreed_pro_version < 91)
2564                                 return -1091;
2565
2566                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2567                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2568                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2569                                 drbd_uuid_set_bm(mdev, 0UL);
2570
2571                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2572                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2573                                 *rule_nr = 34;
2574                         } else {
2575                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2576                                 *rule_nr = 36;
2577                         }
2578
2579                         return 1;
2580                 }
2581
2582                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2583
2584                         if (mdev->tconn->agreed_pro_version < 91)
2585                                 return -1091;
2586
2587                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2588                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2589                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2590
2591                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2592                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2593                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2594
2595                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2596                                 *rule_nr = 35;
2597                         } else {
2598                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2599                                 *rule_nr = 37;
2600                         }
2601
2602                         return -1;
2603                 }
2604
2605                 /* Common power [off|failure] */
2606                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2607                         (mdev->p_uuid[UI_FLAGS] & 2);
2608                 /* lowest bit is set when we were primary,
2609                  * next bit (weight 2) is set when peer was primary */
2610                 *rule_nr = 40;
2611
2612                 switch (rct) {
2613                 case 0: /* !self_pri && !peer_pri */ return 0;
2614                 case 1: /*  self_pri && !peer_pri */ return 1;
2615                 case 2: /* !self_pri &&  peer_pri */ return -1;
2616                 case 3: /*  self_pri &&  peer_pri */
2617                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2618                         return dc ? -1 : 1;
2619                 }
2620         }
2621
2622         *rule_nr = 50;
2623         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2624         if (self == peer)
2625                 return -1;
2626
2627         *rule_nr = 51;
2628         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2629         if (self == peer) {
2630                 if (mdev->tconn->agreed_pro_version < 96 ?
2631                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2632                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2633                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2634                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2635                            resync as sync source modifications of the peer's UUIDs. */
2636
2637                         if (mdev->tconn->agreed_pro_version < 91)
2638                                 return -1091;
2639
2640                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2641                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2642
2643                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2644                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2645
2646                         return -1;
2647                 }
2648         }
2649
2650         *rule_nr = 60;
2651         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2652         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2653                 peer = mdev->p_uuid[i] & ~((u64)1);
2654                 if (self == peer)
2655                         return -2;
2656         }
2657
2658         *rule_nr = 70;
2659         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2660         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2661         if (self == peer)
2662                 return 1;
2663
2664         *rule_nr = 71;
2665         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2666         if (self == peer) {
2667                 if (mdev->tconn->agreed_pro_version < 96 ?
2668                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2669                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2670                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2671                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2672                            resync as sync source modifications of our UUIDs. */
2673
2674                         if (mdev->tconn->agreed_pro_version < 91)
2675                                 return -1091;
2676
2677                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2678                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2679
2680                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2681                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2682                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2683
2684                         return 1;
2685                 }
2686         }
2687
2688
2689         *rule_nr = 80;
2690         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2691         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2692                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2693                 if (self == peer)
2694                         return 2;
2695         }
2696
2697         *rule_nr = 90;
2698         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2699         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2700         if (self == peer && self != ((u64)0))
2701                 return 100;
2702
2703         *rule_nr = 100;
2704         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2705                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2706                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2707                         peer = mdev->p_uuid[j] & ~((u64)1);
2708                         if (self == peer)
2709                                 return -100;
2710                 }
2711         }
2712
2713         return -1000;
2714 }
2715
2716 /* drbd_sync_handshake() returns the new conn state on success, or
2717    CONN_MASK (-1) on failure.
2718  */
2719 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2720                                            enum drbd_disk_state peer_disk) __must_hold(local)
2721 {
2722         int hg, rule_nr;
2723         enum drbd_conns rv = C_MASK;
2724         enum drbd_disk_state mydisk;
2725
2726         mydisk = mdev->state.disk;
2727         if (mydisk == D_NEGOTIATING)
2728                 mydisk = mdev->new_state_tmp.disk;
2729
2730         dev_info(DEV, "drbd_sync_handshake:\n");
2731         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2732         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2733                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2734
2735         hg = drbd_uuid_compare(mdev, &rule_nr);
2736
2737         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2738
2739         if (hg == -1000) {
2740                 dev_alert(DEV, "Unrelated data, aborting!\n");
2741                 return C_MASK;
2742         }
2743         if (hg < -1000) {
2744                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2745                 return C_MASK;
2746         }
2747
2748         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2749             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2750                 int f = (hg == -100) || abs(hg) == 2;
2751                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2752                 if (f)
2753                         hg = hg*2;
2754                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2755                      hg > 0 ? "source" : "target");
2756         }
2757
2758         if (abs(hg) == 100)
2759                 drbd_khelper(mdev, "initial-split-brain");
2760
2761         if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
2762                 int pcount = (mdev->state.role == R_PRIMARY)
2763                            + (peer_role == R_PRIMARY);
2764                 int forced = (hg == -100);
2765
2766                 switch (pcount) {
2767                 case 0:
2768                         hg = drbd_asb_recover_0p(mdev);
2769                         break;
2770                 case 1:
2771                         hg = drbd_asb_recover_1p(mdev);
2772                         break;
2773                 case 2:
2774                         hg = drbd_asb_recover_2p(mdev);
2775                         break;
2776                 }
2777                 if (abs(hg) < 100) {
2778                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2779                              "automatically solved. Sync from %s node\n",
2780                              pcount, (hg < 0) ? "peer" : "this");
2781                         if (forced) {
2782                                 dev_warn(DEV, "Doing a full sync, since"
2783                                      " UUIDs where ambiguous.\n");
2784                                 hg = hg*2;
2785                         }
2786                 }
2787         }
2788
2789         if (hg == -100) {
2790                 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2791                         hg = -1;
2792                 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2793                         hg = 1;
2794
2795                 if (abs(hg) < 100)
2796                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2797                              "Sync from %s node\n",
2798                              (hg < 0) ? "peer" : "this");
2799         }
2800
2801         if (hg == -100) {
2802                 /* FIXME this log message is not correct if we end up here
2803                  * after an attempted attach on a diskless node.
2804                  * We just refuse to attach -- well, we drop the "connection"
2805                  * to that disk, in a way... */
2806                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2807                 drbd_khelper(mdev, "split-brain");
2808                 return C_MASK;
2809         }
2810
2811         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2812                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2813                 return C_MASK;
2814         }
2815
2816         if (hg < 0 && /* by intention we do not use mydisk here. */
2817             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2818                 switch (mdev->tconn->net_conf->rr_conflict) {
2819                 case ASB_CALL_HELPER:
2820                         drbd_khelper(mdev, "pri-lost");
2821                         /* fall through */
2822                 case ASB_DISCONNECT:
2823                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2824                         return C_MASK;
2825                 case ASB_VIOLENTLY:
2826                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2827                              "assumption\n");
2828                 }
2829         }
2830
2831         if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2832                 if (hg == 0)
2833                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2834                 else
2835                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2836                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2837                                  abs(hg) >= 2 ? "full" : "bit-map based");
2838                 return C_MASK;
2839         }
2840
2841         if (abs(hg) >= 2) {
2842                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2843                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2844                                         BM_LOCKED_SET_ALLOWED))
2845                         return C_MASK;
2846         }
2847
2848         if (hg > 0) { /* become sync source. */
2849                 rv = C_WF_BITMAP_S;
2850         } else if (hg < 0) { /* become sync target */
2851                 rv = C_WF_BITMAP_T;
2852         } else {
2853                 rv = C_CONNECTED;
2854                 if (drbd_bm_total_weight(mdev)) {
2855                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2856                              drbd_bm_total_weight(mdev));
2857                 }
2858         }
2859
2860         return rv;
2861 }
2862
2863 /* returns 1 if invalid */
2864 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2865 {
2866         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2867         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2868             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2869                 return 0;
2870
2871         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2872         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2873             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2874                 return 1;
2875
2876         /* everything else is valid if they are equal on both sides. */
2877         if (peer == self)
2878                 return 0;
2879
2880         /* everything es is invalid. */
2881         return 1;
2882 }
2883
2884 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2885 {
2886         struct p_protocol *p = tconn->data.rbuf;
2887         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2888         int p_want_lose, p_two_primaries, cf;
2889         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2890
2891         p_proto         = be32_to_cpu(p->protocol);
2892         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2893         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2894         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2895         p_two_primaries = be32_to_cpu(p->two_primaries);
2896         cf              = be32_to_cpu(p->conn_flags);
2897         p_want_lose = cf & CF_WANT_LOSE;
2898
2899         clear_bit(CONN_DRY_RUN, &tconn->flags);
2900
2901         if (cf & CF_DRY_RUN)
2902                 set_bit(CONN_DRY_RUN, &tconn->flags);
2903
2904         if (p_proto != tconn->net_conf->wire_protocol) {
2905                 conn_err(tconn, "incompatible communication protocols\n");
2906                 goto disconnect;
2907         }
2908
2909         if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2910                 conn_err(tconn, "incompatible after-sb-0pri settings\n");
2911                 goto disconnect;
2912         }
2913
2914         if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2915                 conn_err(tconn, "incompatible after-sb-1pri settings\n");
2916                 goto disconnect;
2917         }
2918
2919         if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2920                 conn_err(tconn, "incompatible after-sb-2pri settings\n");
2921                 goto disconnect;
2922         }
2923
2924         if (p_want_lose && tconn->net_conf->want_lose) {
2925                 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
2926                 goto disconnect;
2927         }
2928
2929         if (p_two_primaries != tconn->net_conf->two_primaries) {
2930                 conn_err(tconn, "incompatible setting of the two-primaries options\n");
2931                 goto disconnect;
2932         }
2933
2934         if (tconn->agreed_pro_version >= 87) {
2935                 unsigned char *my_alg = tconn->net_conf->integrity_alg;
2936                 int err;
2937
2938                 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
2939                 if (err)
2940                         return err;
2941
2942                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2943                 if (strcmp(p_integrity_alg, my_alg)) {
2944                         conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
2945                         goto disconnect;
2946                 }
2947                 conn_info(tconn, "data-integrity-alg: %s\n",
2948                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2949         }
2950
2951         return 0;
2952
2953 disconnect:
2954         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
2955         return -EIO;
2956 }
2957
2958 /* helper function
2959  * input: alg name, feature name
2960  * return: NULL (alg name was "")
2961  *         ERR_PTR(error) if something goes wrong
2962  *         or the crypto hash ptr, if it worked out ok. */
2963 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2964                 const char *alg, const char *name)
2965 {
2966         struct crypto_hash *tfm;
2967
2968         if (!alg[0])
2969                 return NULL;
2970
2971         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2972         if (IS_ERR(tfm)) {
2973                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2974                         alg, name, PTR_ERR(tfm));
2975                 return tfm;
2976         }
2977         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2978                 crypto_free_hash(tfm);
2979                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2980                 return ERR_PTR(-EINVAL);
2981         }
2982         return tfm;
2983 }
2984
2985 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
2986 {
2987         void *buffer = tconn->data.rbuf;
2988         int size = pi->size;
2989
2990         while (size) {
2991                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
2992                 s = drbd_recv(tconn, buffer, s);
2993                 if (s <= 0) {
2994                         if (s < 0)
2995                                 return s;
2996                         break;
2997                 }
2998                 size -= s;
2999         }
3000         if (size)
3001                 return -EIO;
3002         return 0;
3003 }
3004
3005 /*
3006  * config_unknown_volume  -  device configuration command for unknown volume
3007  *
3008  * When a device is added to an existing connection, the node on which the
3009  * device is added first will send configuration commands to its peer but the
3010  * peer will not know about the device yet.  It will warn and ignore these
3011  * commands.  Once the device is added on the second node, the second node will
3012  * send the same device configuration commands, but in the other direction.
3013  *
3014  * (We can also end up here if drbd is misconfigured.)
3015  */
3016 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3017 {
3018         conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3019                   pi->vnr, cmdname(pi->cmd));
3020         return ignore_remaining_packet(tconn, pi);
3021 }
3022
3023 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3024 {
3025         struct drbd_conf *mdev;
3026         struct p_rs_param_95 *p = tconn->data.rbuf;
3027         unsigned int header_size, data_size, exp_max_sz;
3028         struct crypto_hash *verify_tfm = NULL;
3029         struct crypto_hash *csums_tfm = NULL;
3030         const int apv = tconn->agreed_pro_version;
3031         int *rs_plan_s = NULL;
3032         int fifo_size = 0;
3033         int err;
3034
3035         mdev = vnr_to_mdev(tconn, pi->vnr);
3036         if (!mdev)
3037                 return config_unknown_volume(tconn, pi);
3038
3039         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3040                     : apv == 88 ? sizeof(struct p_rs_param)
3041                                         + SHARED_SECRET_MAX
3042                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3043                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3044
3045         if (pi->size > exp_max_sz) {
3046                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3047                     pi->size, exp_max_sz);
3048                 return -EIO;
3049         }
3050
3051         if (apv <= 88) {
3052                 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
3053                 data_size = pi->size - header_size;
3054         } else if (apv <= 94) {
3055                 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
3056                 data_size = pi->size - header_size;
3057                 D_ASSERT(data_size == 0);
3058         } else {
3059                 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
3060                 data_size = pi->size - header_size;
3061                 D_ASSERT(data_size == 0);
3062         }
3063
3064         /* initialize verify_alg and csums_alg */
3065         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3066
3067         err = drbd_recv_all(mdev->tconn, &p->head.payload, header_size);
3068         if (err)
3069                 return err;
3070
3071         if (get_ldev(mdev)) {
3072                 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3073                 put_ldev(mdev);
3074         }
3075
3076         if (apv >= 88) {
3077                 if (apv == 88) {
3078                         if (data_size > SHARED_SECRET_MAX) {
3079                                 dev_err(DEV, "verify-alg too long, "
3080                                     "peer wants %u, accepting only %u byte\n",
3081                                                 data_size, SHARED_SECRET_MAX);
3082                                 return -EIO;
3083                         }
3084
3085                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3086                         if (err)
3087                                 return err;
3088
3089                         /* we expect NUL terminated string */
3090                         /* but just in case someone tries to be evil */
3091                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3092                         p->verify_alg[data_size-1] = 0;
3093
3094                 } else /* apv >= 89 */ {
3095                         /* we still expect NUL terminated strings */
3096                         /* but just in case someone tries to be evil */
3097                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3098                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3099                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3100                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3101                 }
3102
3103                 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
3104                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3105                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3106                                     mdev->tconn->net_conf->verify_alg, p->verify_alg);
3107                                 goto disconnect;
3108                         }
3109                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3110                                         p->verify_alg, "verify-alg");
3111                         if (IS_ERR(verify_tfm)) {
3112                                 verify_tfm = NULL;
3113                                 goto disconnect;
3114                         }
3115                 }
3116
3117                 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
3118                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3119                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3120                                     mdev->tconn->net_conf->csums_alg, p->csums_alg);
3121                                 goto disconnect;
3122                         }
3123                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3124                                         p->csums_alg, "csums-alg");
3125                         if (IS_ERR(csums_tfm)) {
3126                                 csums_tfm = NULL;
3127                                 goto disconnect;
3128                         }
3129                 }
3130
3131                 if (apv > 94 && get_ldev(mdev)) {
3132                         mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3133                         mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3134                         mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3135                         mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3136                         mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3137
3138                         fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3139                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3140                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3141                                 if (!rs_plan_s) {
3142                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3143                                         put_ldev(mdev);
3144                                         goto disconnect;
3145                                 }
3146                         }
3147                         put_ldev(mdev);
3148                 }
3149
3150                 spin_lock(&mdev->peer_seq_lock);
3151                 /* lock against drbd_nl_syncer_conf() */
3152                 if (verify_tfm) {
3153                         strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3154                         mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3155                         crypto_free_hash(mdev->tconn->verify_tfm);
3156                         mdev->tconn->verify_tfm = verify_tfm;
3157                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3158                 }
3159                 if (csums_tfm) {
3160                         strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3161                         mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3162                         crypto_free_hash(mdev->tconn->csums_tfm);
3163                         mdev->tconn->csums_tfm = csums_tfm;
3164                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3165                 }
3166                 if (fifo_size != mdev->rs_plan_s.size) {
3167                         kfree(mdev->rs_plan_s.values);
3168                         mdev->rs_plan_s.values = rs_plan_s;
3169                         mdev->rs_plan_s.size   = fifo_size;
3170                         mdev->rs_planed = 0;
3171                 }
3172                 spin_unlock(&mdev->peer_seq_lock);
3173         }
3174         return 0;
3175
3176 disconnect:
3177         /* just for completeness: actually not needed,
3178          * as this is not reached if csums_tfm was ok. */
3179         crypto_free_hash(csums_tfm);
3180         /* but free the verify_tfm again, if csums_tfm did not work out */
3181         crypto_free_hash(verify_tfm);
3182         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3183         return -EIO;
3184 }
3185
3186 /* warn if the arguments differ by more than 12.5% */
3187 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3188         const char *s, sector_t a, sector_t b)
3189 {
3190         sector_t d;
3191         if (a == 0 || b == 0)
3192                 return;
3193         d = (a > b) ? (a - b) : (b - a);
3194         if (d > (a>>3) || d > (b>>3))
3195                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3196                      (unsigned long long)a, (unsigned long long)b);
3197 }
3198
3199 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3200 {
3201         struct drbd_conf *mdev;
3202         struct p_sizes *p = tconn->data.rbuf;
3203         enum determine_dev_size dd = unchanged;
3204         sector_t p_size, p_usize, my_usize;
3205         int ldsc = 0; /* local disk size changed */
3206         enum dds_flags ddsf;
3207
3208         mdev = vnr_to_mdev(tconn, pi->vnr);
3209         if (!mdev)
3210                 return config_unknown_volume(tconn, pi);
3211
3212         p_size = be64_to_cpu(p->d_size);
3213         p_usize = be64_to_cpu(p->u_size);
3214
3215         /* just store the peer's disk size for now.
3216          * we still need to figure out whether we accept that. */
3217         mdev->p_size = p_size;
3218
3219         if (get_ldev(mdev)) {
3220                 warn_if_differ_considerably(mdev, "lower level device sizes",
3221                            p_size, drbd_get_max_capacity(mdev->ldev));
3222                 warn_if_differ_considerably(mdev, "user requested size",
3223                                             p_usize, mdev->ldev->dc.disk_size);
3224
3225                 /* if this is the first connect, or an otherwise expected
3226                  * param exchange, choose the minimum */
3227                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3228                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3229                                              p_usize);
3230
3231                 my_usize = mdev->ldev->dc.disk_size;
3232
3233                 if (mdev->ldev->dc.disk_size != p_usize) {
3234                         mdev->ldev->dc.disk_size = p_usize;
3235                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3236                              (unsigned long)mdev->ldev->dc.disk_size);
3237                 }
3238
3239                 /* Never shrink a device with usable data during connect.
3240                    But allow online shrinking if we are connected. */
3241                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3242                    drbd_get_capacity(mdev->this_bdev) &&
3243                    mdev->state.disk >= D_OUTDATED &&
3244                    mdev->state.conn < C_CONNECTED) {
3245                         dev_err(DEV, "The peer's disk size is too small!\n");
3246                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3247                         mdev->ldev->dc.disk_size = my_usize;
3248                         put_ldev(mdev);
3249                         return -EIO;
3250                 }
3251                 put_ldev(mdev);
3252         }
3253
3254         ddsf = be16_to_cpu(p->dds_flags);
3255         if (get_ldev(mdev)) {
3256                 dd = drbd_determine_dev_size(mdev, ddsf);
3257                 put_ldev(mdev);
3258                 if (dd == dev_size_error)
3259                         return -EIO;
3260                 drbd_md_sync(mdev);
3261         } else {
3262                 /* I am diskless, need to accept the peer's size. */
3263                 drbd_set_my_capacity(mdev, p_size);
3264         }
3265
3266         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3267         drbd_reconsider_max_bio_size(mdev);
3268
3269         if (get_ldev(mdev)) {
3270                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3271                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3272                         ldsc = 1;
3273                 }
3274
3275                 put_ldev(mdev);
3276         }
3277
3278         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3279                 if (be64_to_cpu(p->c_size) !=
3280                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3281                         /* we have different sizes, probably peer
3282                          * needs to know my new size... */
3283                         drbd_send_sizes(mdev, 0, ddsf);
3284                 }
3285                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3286                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3287                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3288                             mdev->state.disk >= D_INCONSISTENT) {
3289                                 if (ddsf & DDSF_NO_RESYNC)
3290                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3291                                 else
3292                                         resync_after_online_grow(mdev);
3293                         } else
3294                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3295                 }
3296         }
3297
3298         return 0;
3299 }
3300
3301 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3302 {
3303         struct drbd_conf *mdev;
3304         struct p_uuids *p = tconn->data.rbuf;
3305         u64 *p_uuid;
3306         int i, updated_uuids = 0;
3307
3308         mdev = vnr_to_mdev(tconn, pi->vnr);
3309         if (!mdev)
3310                 return config_unknown_volume(tconn, pi);
3311
3312         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3313
3314         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3315                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3316
3317         kfree(mdev->p_uuid);
3318         mdev->p_uuid = p_uuid;
3319
3320         if (mdev->state.conn < C_CONNECTED &&
3321             mdev->state.disk < D_INCONSISTENT &&
3322             mdev->state.role == R_PRIMARY &&
3323             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3324                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3325                     (unsigned long long)mdev->ed_uuid);
3326                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3327                 return -EIO;
3328         }
3329
3330         if (get_ldev(mdev)) {
3331                 int skip_initial_sync =
3332                         mdev->state.conn == C_CONNECTED &&
3333                         mdev->tconn->agreed_pro_version >= 90 &&
3334                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3335                         (p_uuid[UI_FLAGS] & 8);
3336                 if (skip_initial_sync) {
3337                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3338                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3339                                         "clear_n_write from receive_uuids",
3340                                         BM_LOCKED_TEST_ALLOWED);
3341                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3342                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3343                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3344                                         CS_VERBOSE, NULL);
3345                         drbd_md_sync(mdev);
3346                         updated_uuids = 1;
3347                 }
3348                 put_ldev(mdev);
3349         } else if (mdev->state.disk < D_INCONSISTENT &&
3350                    mdev->state.role == R_PRIMARY) {
3351                 /* I am a diskless primary, the peer just created a new current UUID
3352                    for me. */
3353                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3354         }
3355
3356         /* Before we test for the disk state, we should wait until an eventually
3357            ongoing cluster wide state change is finished. That is important if
3358            we are primary and are detaching from our disk. We need to see the
3359            new disk state... */
3360         mutex_lock(mdev->state_mutex);
3361         mutex_unlock(mdev->state_mutex);
3362         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3363                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3364
3365         if (updated_uuids)
3366                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3367
3368         return 0;
3369 }
3370
3371 /**
3372  * convert_state() - Converts the peer's view of the cluster state to our point of view
3373  * @ps:         The state as seen by the peer.
3374  */
3375 static union drbd_state convert_state(union drbd_state ps)
3376 {
3377         union drbd_state ms;
3378
3379         static enum drbd_conns c_tab[] = {
3380                 [C_CONNECTED] = C_CONNECTED,
3381
3382                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3383                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3384                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3385                 [C_VERIFY_S]       = C_VERIFY_T,
3386                 [C_MASK]   = C_MASK,
3387         };
3388
3389         ms.i = ps.i;
3390
3391         ms.conn = c_tab[ps.conn];
3392         ms.peer = ps.role;
3393         ms.role = ps.peer;
3394         ms.pdsk = ps.disk;
3395         ms.disk = ps.pdsk;
3396         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3397
3398         return ms;
3399 }
3400
3401 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3402 {
3403         struct drbd_conf *mdev;
3404         struct p_req_state *p = tconn->data.rbuf;
3405         union drbd_state mask, val;
3406         enum drbd_state_rv rv;
3407
3408         mdev = vnr_to_mdev(tconn, pi->vnr);
3409         if (!mdev)
3410                 return -EIO;
3411
3412         mask.i = be32_to_cpu(p->mask);
3413         val.i = be32_to_cpu(p->val);
3414
3415         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3416             mutex_is_locked(mdev->state_mutex)) {
3417                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3418                 return 0;
3419         }
3420
3421         mask = convert_state(mask);
3422         val = convert_state(val);
3423
3424         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3425         drbd_send_sr_reply(mdev, rv);
3426
3427         drbd_md_sync(mdev);
3428
3429         return 0;
3430 }
3431
3432 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3433 {
3434         struct p_req_state *p = tconn->data.rbuf;
3435         union drbd_state mask, val;
3436         enum drbd_state_rv rv;
3437
3438         mask.i = be32_to_cpu(p->mask);
3439         val.i = be32_to_cpu(p->val);
3440
3441         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3442             mutex_is_locked(&tconn->cstate_mutex)) {
3443                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3444                 return 0;
3445         }
3446
3447         mask = convert_state(mask);
3448         val = convert_state(val);
3449
3450         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY);
3451         conn_send_sr_reply(tconn, rv);
3452
3453         return 0;
3454 }
3455
3456 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3457 {
3458         struct drbd_conf *mdev;
3459         struct p_state *p = tconn->data.rbuf;
3460         union drbd_state os, ns, peer_state;
3461         enum drbd_disk_state real_peer_disk;
3462         enum chg_state_flags cs_flags;
3463         int rv;
3464
3465         mdev = vnr_to_mdev(tconn, pi->vnr);
3466         if (!mdev)
3467                 return config_unknown_volume(tconn, pi);
3468
3469         peer_state.i = be32_to_cpu(p->state);
3470
3471         real_peer_disk = peer_state.disk;
3472         if (peer_state.disk == D_NEGOTIATING) {
3473                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3474                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3475         }
3476
3477         spin_lock_irq(&mdev->tconn->req_lock);
3478  retry:
3479         os = ns = mdev->state;
3480         spin_unlock_irq(&mdev->tconn->req_lock);
3481
3482         /* peer says his disk is uptodate, while we think it is inconsistent,
3483          * and this happens while we think we have a sync going on. */
3484         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3485             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3486                 /* If we are (becoming) SyncSource, but peer is still in sync
3487                  * preparation, ignore its uptodate-ness to avoid flapping, it
3488                  * will change to inconsistent once the peer reaches active
3489                  * syncing states.
3490                  * It may have changed syncer-paused flags, however, so we
3491                  * cannot ignore this completely. */
3492                 if (peer_state.conn > C_CONNECTED &&
3493                     peer_state.conn < C_SYNC_SOURCE)
3494                         real_peer_disk = D_INCONSISTENT;
3495
3496                 /* if peer_state changes to connected at the same time,
3497                  * it explicitly notifies us that it finished resync.
3498                  * Maybe we should finish it up, too? */
3499                 else if (os.conn >= C_SYNC_SOURCE &&
3500                          peer_state.conn == C_CONNECTED) {
3501                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3502                                 drbd_resync_finished(mdev);
3503                         return 0;
3504                 }
3505         }
3506
3507         /* peer says his disk is inconsistent, while we think it is uptodate,
3508          * and this happens while the peer still thinks we have a sync going on,
3509          * but we think we are already done with the sync.
3510          * We ignore this to avoid flapping pdsk.
3511          * This should not happen, if the peer is a recent version of drbd. */
3512         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3513             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3514                 real_peer_disk = D_UP_TO_DATE;
3515
3516         if (ns.conn == C_WF_REPORT_PARAMS)
3517                 ns.conn = C_CONNECTED;
3518
3519         if (peer_state.conn == C_AHEAD)
3520                 ns.conn = C_BEHIND;
3521
3522         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3523             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3524                 int cr; /* consider resync */
3525
3526                 /* if we established a new connection */
3527                 cr  = (os.conn < C_CONNECTED);
3528                 /* if we had an established connection
3529                  * and one of the nodes newly attaches a disk */
3530                 cr |= (os.conn == C_CONNECTED &&
3531                        (peer_state.disk == D_NEGOTIATING ||
3532                         os.disk == D_NEGOTIATING));
3533                 /* if we have both been inconsistent, and the peer has been
3534                  * forced to be UpToDate with --overwrite-data */
3535                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3536                 /* if we had been plain connected, and the admin requested to
3537                  * start a sync by "invalidate" or "invalidate-remote" */
3538                 cr |= (os.conn == C_CONNECTED &&
3539                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3540                                  peer_state.conn <= C_WF_BITMAP_T));
3541
3542                 if (cr)
3543                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3544
3545                 put_ldev(mdev);
3546                 if (ns.conn == C_MASK) {
3547                         ns.conn = C_CONNECTED;
3548                         if (mdev->state.disk == D_NEGOTIATING) {
3549                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3550                         } else if (peer_state.disk == D_NEGOTIATING) {
3551                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3552                                 peer_state.disk = D_DISKLESS;
3553                                 real_peer_disk = D_DISKLESS;
3554                         } else {
3555                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3556                                         return -EIO;
3557                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3558                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3559                                 return -EIO;
3560                         }
3561                 }
3562         }
3563
3564         spin_lock_irq(&mdev->tconn->req_lock);
3565         if (mdev->state.i != os.i)
3566                 goto retry;
3567         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3568         ns.peer = peer_state.role;
3569         ns.pdsk = real_peer_disk;
3570         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3571         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3572                 ns.disk = mdev->new_state_tmp.disk;
3573         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3574         if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3575             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3576                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3577                    for temporal network outages! */
3578                 spin_unlock_irq(&mdev->tconn->req_lock);
3579                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3580                 tl_clear(mdev->tconn);
3581                 drbd_uuid_new_current(mdev);
3582                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3583                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3584                 return -EIO;
3585         }
3586         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3587         ns = mdev->state;
3588         spin_unlock_irq(&mdev->tconn->req_lock);
3589
3590         if (rv < SS_SUCCESS) {
3591                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3592                 return -EIO;
3593         }
3594
3595         if (os.conn > C_WF_REPORT_PARAMS) {
3596                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3597                     peer_state.disk != D_NEGOTIATING ) {
3598                         /* we want resync, peer has not yet decided to sync... */
3599                         /* Nowadays only used when forcing a node into primary role and
3600                            setting its disk to UpToDate with that */
3601                         drbd_send_uuids(mdev);
3602                         drbd_send_state(mdev);
3603                 }
3604         }
3605
3606         mdev->tconn->net_conf->want_lose = 0;
3607
3608         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3609
3610         return 0;
3611 }
3612
3613 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3614 {
3615         struct drbd_conf *mdev;
3616         struct p_rs_uuid *p = tconn->data.rbuf;
3617
3618         mdev = vnr_to_mdev(tconn, pi->vnr);
3619         if (!mdev)
3620                 return -EIO;
3621
3622         wait_event(mdev->misc_wait,
3623                    mdev->state.conn == C_WF_SYNC_UUID ||
3624                    mdev->state.conn == C_BEHIND ||
3625                    mdev->state.conn < C_CONNECTED ||
3626                    mdev->state.disk < D_NEGOTIATING);
3627
3628         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3629
3630         /* Here the _drbd_uuid_ functions are right, current should
3631            _not_ be rotated into the history */
3632         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3633                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3634                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3635
3636                 drbd_print_uuids(mdev, "updated sync uuid");
3637                 drbd_start_resync(mdev, C_SYNC_TARGET);
3638
3639                 put_ldev(mdev);
3640         } else
3641                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3642
3643         return 0;
3644 }
3645
3646 /**
3647  * receive_bitmap_plain
3648  *
3649  * Return 0 when done, 1 when another iteration is needed, and a negative error
3650  * code upon failure.
3651  */
3652 static int
3653 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3654                      struct p_header *h, struct bm_xfer_ctx *c)
3655 {
3656         unsigned long *buffer = (unsigned long *)h->payload;
3657         unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3658         unsigned want = num_words * sizeof(long);
3659         int err;
3660
3661         if (want != data_size) {
3662                 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3663                 return -EIO;
3664         }
3665         if (want == 0)
3666                 return 0;
3667         err = drbd_recv_all(mdev->tconn, buffer, want);
3668         if (err)
3669                 return err;
3670
3671         drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3672
3673         c->word_offset += num_words;
3674         c->bit_offset = c->word_offset * BITS_PER_LONG;
3675         if (c->bit_offset > c->bm_bits)
3676                 c->bit_offset = c->bm_bits;
3677
3678         return 1;
3679 }
3680
3681 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3682 {
3683         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3684 }
3685
3686 static int dcbp_get_start(struct p_compressed_bm *p)
3687 {
3688         return (p->encoding & 0x80) != 0;
3689 }
3690
3691 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3692 {
3693         return (p->encoding >> 4) & 0x7;
3694 }
3695
3696 /**
3697  * recv_bm_rle_bits
3698  *
3699  * Return 0 when done, 1 when another iteration is needed, and a negative error
3700  * code upon failure.
3701  */
3702 static int
3703 recv_bm_rle_bits(struct drbd_conf *mdev,
3704                 struct p_compressed_bm *p,
3705                  struct bm_xfer_ctx *c,
3706                  unsigned int len)
3707 {
3708         struct bitstream bs;
3709         u64 look_ahead;
3710         u64 rl;
3711         u64 tmp;
3712         unsigned long s = c->bit_offset;
3713         unsigned long e;
3714         int toggle = dcbp_get_start(p);
3715         int have;
3716         int bits;
3717
3718         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3719
3720         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3721         if (bits < 0)
3722                 return -EIO;
3723
3724         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3725                 bits = vli_decode_bits(&rl, look_ahead);
3726                 if (bits <= 0)
3727                         return -EIO;
3728
3729                 if (toggle) {
3730                         e = s + rl -1;
3731                         if (e >= c->bm_bits) {
3732                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3733                                 return -EIO;
3734                         }
3735                         _drbd_bm_set_bits(mdev, s, e);
3736                 }
3737
3738                 if (have < bits) {
3739                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3740                                 have, bits, look_ahead,
3741                                 (unsigned int)(bs.cur.b - p->code),
3742                                 (unsigned int)bs.buf_len);
3743                         return -EIO;
3744                 }
3745                 look_ahead >>= bits;
3746                 have -= bits;
3747
3748                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3749                 if (bits < 0)
3750                         return -EIO;
3751                 look_ahead |= tmp << have;
3752                 have += bits;
3753         }
3754
3755         c->bit_offset = s;
3756         bm_xfer_ctx_bit_to_word_offset(c);
3757
3758         return (s != c->bm_bits);
3759 }
3760
3761 /**
3762  * decode_bitmap_c
3763  *
3764  * Return 0 when done, 1 when another iteration is needed, and a negative error
3765  * code upon failure.
3766  */
3767 static int
3768 decode_bitmap_c(struct drbd_conf *mdev,
3769                 struct p_compressed_bm *p,
3770                 struct bm_xfer_ctx *c,
3771                 unsigned int len)
3772 {
3773         if (dcbp_get_code(p) == RLE_VLI_Bits)
3774                 return recv_bm_rle_bits(mdev, p, c, len);
3775
3776         /* other variants had been implemented for evaluation,
3777          * but have been dropped as this one turned out to be "best"
3778          * during all our tests. */
3779
3780         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3781         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3782         return -EIO;
3783 }
3784
3785 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3786                 const char *direction, struct bm_xfer_ctx *c)
3787 {
3788         /* what would it take to transfer it "plaintext" */
3789         unsigned plain = sizeof(struct p_header) *
3790                 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3791                 + c->bm_words * sizeof(long);
3792         unsigned total = c->bytes[0] + c->bytes[1];
3793         unsigned r;
3794
3795         /* total can not be zero. but just in case: */
3796         if (total == 0)
3797                 return;
3798
3799         /* don't report if not compressed */
3800         if (total >= plain)
3801                 return;
3802
3803         /* total < plain. check for overflow, still */
3804         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3805                                     : (1000 * total / plain);
3806
3807         if (r > 1000)
3808                 r = 1000;
3809
3810         r = 1000 - r;
3811         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3812              "total %u; compression: %u.%u%%\n",
3813                         direction,
3814                         c->bytes[1], c->packets[1],
3815                         c->bytes[0], c->packets[0],
3816                         total, r/10, r % 10);
3817 }
3818
3819 /* Since we are processing the bitfield from lower addresses to higher,
3820    it does not matter if the process it in 32 bit chunks or 64 bit
3821    chunks as long as it is little endian. (Understand it as byte stream,
3822    beginning with the lowest byte...) If we would use big endian
3823    we would need to process it from the highest address to the lowest,
3824    in order to be agnostic to the 32 vs 64 bits issue.
3825
3826    returns 0 on failure, 1 if we successfully received it. */
3827 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3828 {
3829         struct drbd_conf *mdev;
3830         struct bm_xfer_ctx c;
3831         int err;
3832         struct p_header *h = tconn->data.rbuf;
3833
3834         mdev = vnr_to_mdev(tconn, pi->vnr);
3835         if (!mdev)
3836                 return -EIO;
3837
3838         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3839         /* you are supposed to send additional out-of-sync information
3840          * if you actually set bits during this phase */
3841
3842         c = (struct bm_xfer_ctx) {
3843                 .bm_bits = drbd_bm_bits(mdev),
3844                 .bm_words = drbd_bm_words(mdev),
3845         };
3846
3847         for(;;) {
3848                 if (pi->cmd == P_BITMAP) {
3849                         err = receive_bitmap_plain(mdev, pi->size, h, &c);
3850                 } else if (pi->cmd == P_COMPRESSED_BITMAP) {
3851                         /* MAYBE: sanity check that we speak proto >= 90,
3852                          * and the feature is enabled! */
3853                         struct p_compressed_bm *p;
3854
3855                         if (pi->size > BM_PACKET_PAYLOAD_BYTES) {
3856                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3857                                 err = -EIO;
3858                                 goto out;
3859                         }
3860
3861                         p = mdev->tconn->data.rbuf;
3862                         err = drbd_recv_all(mdev->tconn, p->head.payload, pi->size);
3863                         if (err)
3864                                goto out;
3865                         if (pi->size <= (sizeof(*p) - sizeof(p->head))) {
3866                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
3867                                 err = -EIO;
3868                                 goto out;
3869                         }
3870                         err = decode_bitmap_c(mdev, p, &c, pi->size);
3871                 } else {
3872                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
3873                         err = -EIO;
3874                         goto out;
3875                 }
3876
3877                 c.packets[pi->cmd == P_BITMAP]++;
3878                 c.bytes[pi->cmd == P_BITMAP] += sizeof(struct p_header) + pi->size;
3879
3880                 if (err <= 0) {
3881                         if (err < 0)
3882                                 goto out;
3883                         break;
3884                 }
3885                 err = drbd_recv_header(mdev->tconn, pi);
3886                 if (err)
3887                         goto out;
3888         }
3889
3890         INFO_bm_xfer_stats(mdev, "receive", &c);
3891
3892         if (mdev->state.conn == C_WF_BITMAP_T) {
3893                 enum drbd_state_rv rv;
3894
3895                 err = drbd_send_bitmap(mdev);
3896                 if (err)
3897                         goto out;
3898                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3899                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3900                 D_ASSERT(rv == SS_SUCCESS);
3901         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3902                 /* admin may have requested C_DISCONNECTING,
3903                  * other threads may have noticed network errors */
3904                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3905                     drbd_conn_str(mdev->state.conn));
3906         }
3907         err = 0;
3908
3909  out:
3910         drbd_bm_unlock(mdev);
3911         if (!err && mdev->state.conn == C_WF_BITMAP_S)
3912                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3913         return err;
3914 }
3915
3916 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
3917 {
3918         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
3919                  pi->cmd, pi->size);
3920
3921         return ignore_remaining_packet(tconn, pi);
3922 }
3923
3924 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
3925 {
3926         /* Make sure we've acked all the TCP data associated
3927          * with the data requests being unplugged */
3928         drbd_tcp_quickack(tconn->data.socket);
3929
3930         return 0;
3931 }
3932
3933 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
3934 {
3935         struct drbd_conf *mdev;
3936         struct p_block_desc *p = tconn->data.rbuf;
3937
3938         mdev = vnr_to_mdev(tconn, pi->vnr);
3939         if (!mdev)
3940                 return -EIO;
3941
3942         switch (mdev->state.conn) {
3943         case C_WF_SYNC_UUID:
3944         case C_WF_BITMAP_T:
3945         case C_BEHIND:
3946                         break;
3947         default:
3948                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3949                                 drbd_conn_str(mdev->state.conn));
3950         }
3951
3952         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3953
3954         return 0;
3955 }
3956
3957 struct data_cmd {
3958         int expect_payload;
3959         size_t pkt_size;
3960         int (*fn)(struct drbd_tconn *, struct packet_info *);
3961 };
3962
3963 static struct data_cmd drbd_cmd_handler[] = {
3964         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
3965         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
3966         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3967         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3968         [P_BITMAP]          = { 1, sizeof(struct p_header), receive_bitmap } ,
3969         [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3970         [P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header), receive_UnplugRemote },
3971         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3972         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3973         [P_SYNC_PARAM]      = { 1, sizeof(struct p_header), receive_SyncParam },
3974         [P_SYNC_PARAM89]    = { 1, sizeof(struct p_header), receive_SyncParam },
3975         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3976         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
3977         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
3978         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
3979         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3980         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3981         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3982         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3983         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3984         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3985         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3986         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
3987 };
3988
3989 static void drbdd(struct drbd_tconn *tconn)
3990 {
3991         struct p_header *header = tconn->data.rbuf;
3992         struct packet_info pi;
3993         size_t shs; /* sub header size */
3994         int err;
3995
3996         while (get_t_state(&tconn->receiver) == RUNNING) {
3997                 struct data_cmd *cmd;
3998
3999                 drbd_thread_current_set_cpu(&tconn->receiver);
4000                 if (drbd_recv_header(tconn, &pi))
4001                         goto err_out;
4002
4003                 cmd = &drbd_cmd_handler[pi.cmd];
4004                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4005                         conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4006                         goto err_out;
4007                 }
4008
4009                 shs = cmd->pkt_size - sizeof(struct p_header);
4010                 if (pi.size - shs > 0 && !cmd->expect_payload) {
4011                         conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4012                         goto err_out;
4013                 }
4014
4015                 if (shs) {
4016                         err = drbd_recv_all_warn(tconn, &header->payload, shs);
4017                         if (err)
4018                                 goto err_out;
4019                         pi.size -= shs;
4020                 }
4021
4022                 err = cmd->fn(tconn, &pi);
4023                 if (err) {
4024                         conn_err(tconn, "error receiving %s, l: %d!\n",
4025                             cmdname(pi.cmd), pi.size);
4026                         goto err_out;
4027                 }
4028         }
4029         return;
4030
4031     err_out:
4032         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4033 }
4034
4035 void conn_flush_workqueue(struct drbd_tconn *tconn)
4036 {
4037         struct drbd_wq_barrier barr;
4038
4039         barr.w.cb = w_prev_work_done;
4040         barr.w.tconn = tconn;
4041         init_completion(&barr.done);
4042         drbd_queue_work(&tconn->data.work, &barr.w);
4043         wait_for_completion(&barr.done);
4044 }
4045
4046 static void drbd_disconnect(struct drbd_tconn *tconn)
4047 {
4048         enum drbd_conns oc;
4049         int rv = SS_UNKNOWN_ERROR;
4050
4051         if (tconn->cstate == C_STANDALONE)
4052                 return;
4053
4054         /* asender does not clean up anything. it must not interfere, either */
4055         drbd_thread_stop(&tconn->asender);
4056         drbd_free_sock(tconn);
4057
4058         idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4059         conn_info(tconn, "Connection closed\n");
4060
4061         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4062                 conn_try_outdate_peer_async(tconn);
4063
4064         spin_lock_irq(&tconn->req_lock);
4065         oc = tconn->cstate;
4066         if (oc >= C_UNCONNECTED)
4067                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4068
4069         spin_unlock_irq(&tconn->req_lock);
4070
4071         if (oc == C_DISCONNECTING) {
4072                 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4073
4074                 crypto_free_hash(tconn->cram_hmac_tfm);
4075                 tconn->cram_hmac_tfm = NULL;
4076
4077                 kfree(tconn->net_conf);
4078                 tconn->net_conf = NULL;
4079                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
4080         }
4081 }
4082
4083 static int drbd_disconnected(int vnr, void *p, void *data)
4084 {
4085         struct drbd_conf *mdev = (struct drbd_conf *)p;
4086         enum drbd_fencing_p fp;
4087         unsigned int i;
4088
4089         /* wait for current activity to cease. */
4090         spin_lock_irq(&mdev->tconn->req_lock);
4091         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4092         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4093         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4094         spin_unlock_irq(&mdev->tconn->req_lock);
4095
4096         /* We do not have data structures that would allow us to
4097          * get the rs_pending_cnt down to 0 again.
4098          *  * On C_SYNC_TARGET we do not have any data structures describing
4099          *    the pending RSDataRequest's we have sent.
4100          *  * On C_SYNC_SOURCE there is no data structure that tracks
4101          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4102          *  And no, it is not the sum of the reference counts in the
4103          *  resync_LRU. The resync_LRU tracks the whole operation including
4104          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4105          *  on the fly. */
4106         drbd_rs_cancel_all(mdev);
4107         mdev->rs_total = 0;
4108         mdev->rs_failed = 0;
4109         atomic_set(&mdev->rs_pending_cnt, 0);
4110         wake_up(&mdev->misc_wait);
4111
4112         del_timer(&mdev->request_timer);
4113
4114         del_timer_sync(&mdev->resync_timer);
4115         resync_timer_fn((unsigned long)mdev);
4116
4117         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4118          * w_make_resync_request etc. which may still be on the worker queue
4119          * to be "canceled" */
4120         drbd_flush_workqueue(mdev);
4121
4122         /* This also does reclaim_net_ee().  If we do this too early, we might
4123          * miss some resync ee and pages.*/
4124         drbd_process_done_ee(mdev);
4125
4126         kfree(mdev->p_uuid);
4127         mdev->p_uuid = NULL;
4128
4129         if (!is_susp(mdev->state))
4130                 tl_clear(mdev->tconn);
4131
4132         drbd_md_sync(mdev);
4133
4134         fp = FP_DONT_CARE;
4135         if (get_ldev(mdev)) {
4136                 fp = mdev->ldev->dc.fencing;
4137                 put_ldev(mdev);
4138         }
4139
4140         /* serialize with bitmap writeout triggered by the state change,
4141          * if any. */
4142         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4143
4144         /* tcp_close and release of sendpage pages can be deferred.  I don't
4145          * want to use SO_LINGER, because apparently it can be deferred for
4146          * more than 20 seconds (longest time I checked).
4147          *
4148          * Actually we don't care for exactly when the network stack does its
4149          * put_page(), but release our reference on these pages right here.
4150          */
4151         i = drbd_release_ee(mdev, &mdev->net_ee);
4152         if (i)
4153                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4154         i = atomic_read(&mdev->pp_in_use_by_net);
4155         if (i)
4156                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4157         i = atomic_read(&mdev->pp_in_use);
4158         if (i)
4159                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4160
4161         D_ASSERT(list_empty(&mdev->read_ee));
4162         D_ASSERT(list_empty(&mdev->active_ee));
4163         D_ASSERT(list_empty(&mdev->sync_ee));
4164         D_ASSERT(list_empty(&mdev->done_ee));
4165
4166         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4167         atomic_set(&mdev->current_epoch->epoch_size, 0);
4168         D_ASSERT(list_empty(&mdev->current_epoch->list));
4169
4170         return 0;
4171 }
4172
4173 /*
4174  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4175  * we can agree on is stored in agreed_pro_version.
4176  *
4177  * feature flags and the reserved array should be enough room for future
4178  * enhancements of the handshake protocol, and possible plugins...
4179  *
4180  * for now, they are expected to be zero, but ignored.
4181  */
4182 static int drbd_send_handshake(struct drbd_tconn *tconn)
4183 {
4184         /* ASSERT current == mdev->tconn->receiver ... */
4185         struct p_handshake *p = tconn->data.sbuf;
4186         int err;
4187
4188         if (mutex_lock_interruptible(&tconn->data.mutex)) {
4189                 conn_err(tconn, "interrupted during initial handshake\n");
4190                 return -EINTR;
4191         }
4192
4193         if (tconn->data.socket == NULL) {
4194                 mutex_unlock(&tconn->data.mutex);
4195                 return -EIO;
4196         }
4197
4198         memset(p, 0, sizeof(*p));
4199         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4200         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4201         err = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
4202                              &p->head, sizeof(*p), 0);
4203         mutex_unlock(&tconn->data.mutex);
4204         return err;
4205 }
4206
4207 /*
4208  * return values:
4209  *   1 yes, we have a valid connection
4210  *   0 oops, did not work out, please try again
4211  *  -1 peer talks different language,
4212  *     no point in trying again, please go standalone.
4213  */
4214 static int drbd_do_handshake(struct drbd_tconn *tconn)
4215 {
4216         /* ASSERT current == tconn->receiver ... */
4217         struct p_handshake *p = tconn->data.rbuf;
4218         const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
4219         struct packet_info pi;
4220         int err;
4221
4222         err = drbd_send_handshake(tconn);
4223         if (err)
4224                 return 0;
4225
4226         err = drbd_recv_header(tconn, &pi);
4227         if (err)
4228                 return 0;
4229
4230         if (pi.cmd != P_HAND_SHAKE) {
4231                 conn_err(tconn, "expected HandShake packet, received: %s (0x%04x)\n",
4232                      cmdname(pi.cmd), pi.cmd);
4233                 return -1;
4234         }
4235
4236         if (pi.size != expect) {
4237                 conn_err(tconn, "expected HandShake length: %u, received: %u\n",
4238                      expect, pi.size);
4239                 return -1;
4240         }
4241
4242         err = drbd_recv_all_warn(tconn, &p->head.payload, expect);
4243         if (err)
4244                 return 0;
4245
4246         p->protocol_min = be32_to_cpu(p->protocol_min);
4247         p->protocol_max = be32_to_cpu(p->protocol_max);
4248         if (p->protocol_max == 0)
4249                 p->protocol_max = p->protocol_min;
4250
4251         if (PRO_VERSION_MAX < p->protocol_min ||
4252             PRO_VERSION_MIN > p->protocol_max)
4253                 goto incompat;
4254
4255         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4256
4257         conn_info(tconn, "Handshake successful: "
4258              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4259
4260         return 1;
4261
4262  incompat:
4263         conn_err(tconn, "incompatible DRBD dialects: "
4264             "I support %d-%d, peer supports %d-%d\n",
4265             PRO_VERSION_MIN, PRO_VERSION_MAX,
4266             p->protocol_min, p->protocol_max);
4267         return -1;
4268 }
4269
4270 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4271 static int drbd_do_auth(struct drbd_tconn *tconn)
4272 {
4273         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4274         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4275         return -1;
4276 }
4277 #else
4278 #define CHALLENGE_LEN 64
4279
4280 /* Return value:
4281         1 - auth succeeded,
4282         0 - failed, try again (network error),
4283         -1 - auth failed, don't try again.
4284 */
4285
4286 static int drbd_do_auth(struct drbd_tconn *tconn)
4287 {
4288         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4289         struct scatterlist sg;
4290         char *response = NULL;
4291         char *right_response = NULL;
4292         char *peers_ch = NULL;
4293         unsigned int key_len = strlen(tconn->net_conf->shared_secret);
4294         unsigned int resp_size;
4295         struct hash_desc desc;
4296         struct packet_info pi;
4297         int err, rv;
4298
4299         desc.tfm = tconn->cram_hmac_tfm;
4300         desc.flags = 0;
4301
4302         rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4303                                 (u8 *)tconn->net_conf->shared_secret, key_len);
4304         if (rv) {
4305                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4306                 rv = -1;
4307                 goto fail;
4308         }
4309
4310         get_random_bytes(my_challenge, CHALLENGE_LEN);
4311
4312         rv = !conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4313         if (!rv)
4314                 goto fail;
4315
4316         err = drbd_recv_header(tconn, &pi);
4317         if (err) {
4318                 rv = 0;
4319                 goto fail;
4320         }
4321
4322         if (pi.cmd != P_AUTH_CHALLENGE) {
4323                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4324                     cmdname(pi.cmd), pi.cmd);
4325                 rv = 0;
4326                 goto fail;
4327         }
4328
4329         if (pi.size > CHALLENGE_LEN * 2) {
4330                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4331                 rv = -1;
4332                 goto fail;
4333         }
4334
4335         peers_ch = kmalloc(pi.size, GFP_NOIO);
4336         if (peers_ch == NULL) {
4337                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4338                 rv = -1;
4339                 goto fail;
4340         }
4341
4342         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4343         if (err) {
4344                 rv = 0;
4345                 goto fail;
4346         }
4347
4348         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4349         response = kmalloc(resp_size, GFP_NOIO);
4350         if (response == NULL) {
4351                 conn_err(tconn, "kmalloc of response failed\n");
4352                 rv = -1;
4353                 goto fail;
4354         }
4355
4356         sg_init_table(&sg, 1);
4357         sg_set_buf(&sg, peers_ch, pi.size);
4358
4359         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4360         if (rv) {
4361                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4362                 rv = -1;
4363                 goto fail;
4364         }
4365
4366         rv = !conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
4367         if (!rv)
4368                 goto fail;
4369
4370         err = drbd_recv_header(tconn, &pi);
4371         if (err) {
4372                 rv = 0;
4373                 goto fail;
4374         }
4375
4376         if (pi.cmd != P_AUTH_RESPONSE) {
4377                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4378                         cmdname(pi.cmd), pi.cmd);
4379                 rv = 0;
4380                 goto fail;
4381         }
4382
4383         if (pi.size != resp_size) {
4384                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4385                 rv = 0;
4386                 goto fail;
4387         }
4388
4389         err = drbd_recv_all_warn(tconn, response , resp_size);
4390         if (err) {
4391                 rv = 0;
4392                 goto fail;
4393         }
4394
4395         right_response = kmalloc(resp_size, GFP_NOIO);
4396         if (right_response == NULL) {
4397                 conn_err(tconn, "kmalloc of right_response failed\n");
4398                 rv = -1;
4399                 goto fail;
4400         }
4401
4402         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4403
4404         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4405         if (rv) {
4406                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4407                 rv = -1;
4408                 goto fail;
4409         }
4410
4411         rv = !memcmp(response, right_response, resp_size);
4412
4413         if (rv)
4414                 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4415                      resp_size, tconn->net_conf->cram_hmac_alg);
4416         else
4417                 rv = -1;
4418
4419  fail:
4420         kfree(peers_ch);
4421         kfree(response);
4422         kfree(right_response);
4423
4424         return rv;
4425 }
4426 #endif
4427
4428 int drbdd_init(struct drbd_thread *thi)
4429 {
4430         struct drbd_tconn *tconn = thi->tconn;
4431         int h;
4432
4433         conn_info(tconn, "receiver (re)started\n");
4434
4435         do {
4436                 h = drbd_connect(tconn);
4437                 if (h == 0) {
4438                         drbd_disconnect(tconn);
4439                         schedule_timeout_interruptible(HZ);
4440                 }
4441                 if (h == -1) {
4442                         conn_warn(tconn, "Discarding network configuration.\n");
4443                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4444                 }
4445         } while (h == 0);
4446
4447         if (h > 0) {
4448                 if (get_net_conf(tconn)) {
4449                         drbdd(tconn);
4450                         put_net_conf(tconn);
4451                 }
4452         }
4453
4454         drbd_disconnect(tconn);
4455
4456         conn_info(tconn, "receiver terminated\n");
4457         return 0;
4458 }
4459
4460 /* ********* acknowledge sender ******** */
4461
4462 static int got_conn_RqSReply(struct drbd_tconn *tconn, enum drbd_packet cmd)
4463 {
4464         struct p_req_state_reply *p = tconn->meta.rbuf;
4465         int retcode = be32_to_cpu(p->retcode);
4466
4467         if (retcode >= SS_SUCCESS) {
4468                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4469         } else {
4470                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4471                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4472                          drbd_set_st_err_str(retcode), retcode);
4473         }
4474         wake_up(&tconn->ping_wait);
4475
4476         return true;
4477 }
4478
4479 static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
4480 {
4481         struct p_req_state_reply *p = mdev->tconn->meta.rbuf;
4482         int retcode = be32_to_cpu(p->retcode);
4483
4484         if (retcode >= SS_SUCCESS) {
4485                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4486         } else {
4487                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4488                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4489                         drbd_set_st_err_str(retcode), retcode);
4490         }
4491         wake_up(&mdev->state_wait);
4492
4493         return true;
4494 }
4495
4496 static int got_Ping(struct drbd_tconn *tconn, enum drbd_packet cmd)
4497 {
4498         return drbd_send_ping_ack(tconn);
4499
4500 }
4501
4502 static int got_PingAck(struct drbd_tconn *tconn, enum drbd_packet cmd)
4503 {
4504         /* restore idle timeout */
4505         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4506         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4507                 wake_up(&tconn->ping_wait);
4508
4509         return true;
4510 }
4511
4512 static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
4513 {
4514         struct p_block_ack *p = mdev->tconn->meta.rbuf;
4515         sector_t sector = be64_to_cpu(p->sector);
4516         int blksize = be32_to_cpu(p->blksize);
4517
4518         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4519
4520         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4521
4522         if (get_ldev(mdev)) {
4523                 drbd_rs_complete_io(mdev, sector);
4524                 drbd_set_in_sync(mdev, sector, blksize);
4525                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4526                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4527                 put_ldev(mdev);
4528         }
4529         dec_rs_pending(mdev);
4530         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4531
4532         return true;
4533 }
4534
4535 static int
4536 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4537                               struct rb_root *root, const char *func,
4538                               enum drbd_req_event what, bool missing_ok)
4539 {
4540         struct drbd_request *req;
4541         struct bio_and_error m;
4542
4543         spin_lock_irq(&mdev->tconn->req_lock);
4544         req = find_request(mdev, root, id, sector, missing_ok, func);
4545         if (unlikely(!req)) {
4546                 spin_unlock_irq(&mdev->tconn->req_lock);
4547                 return false;
4548         }
4549         __req_mod(req, what, &m);
4550         spin_unlock_irq(&mdev->tconn->req_lock);
4551
4552         if (m.bio)
4553                 complete_master_bio(mdev, &m);
4554         return true;
4555 }
4556
4557 static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
4558 {
4559         struct p_block_ack *p = mdev->tconn->meta.rbuf;
4560         sector_t sector = be64_to_cpu(p->sector);
4561         int blksize = be32_to_cpu(p->blksize);
4562         enum drbd_req_event what;
4563
4564         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4565
4566         if (p->block_id == ID_SYNCER) {
4567                 drbd_set_in_sync(mdev, sector, blksize);
4568                 dec_rs_pending(mdev);
4569                 return true;
4570         }
4571         switch (cmd) {
4572         case P_RS_WRITE_ACK:
4573                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4574                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4575                 break;
4576         case P_WRITE_ACK:
4577                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4578                 what = WRITE_ACKED_BY_PEER;
4579                 break;
4580         case P_RECV_ACK:
4581                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
4582                 what = RECV_ACKED_BY_PEER;
4583                 break;
4584         case P_DISCARD_WRITE:
4585                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4586                 what = DISCARD_WRITE;
4587                 break;
4588         case P_RETRY_WRITE:
4589                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4590                 what = POSTPONE_WRITE;
4591                 break;
4592         default:
4593                 D_ASSERT(0);
4594                 return false;
4595         }
4596
4597         return validate_req_change_req_state(mdev, p->block_id, sector,
4598                                              &mdev->write_requests, __func__,
4599                                              what, false);
4600 }
4601
4602 static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
4603 {
4604         struct p_block_ack *p = mdev->tconn->meta.rbuf;
4605         sector_t sector = be64_to_cpu(p->sector);
4606         int size = be32_to_cpu(p->blksize);
4607         bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4608                           mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
4609         bool found;
4610
4611         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4612
4613         if (p->block_id == ID_SYNCER) {
4614                 dec_rs_pending(mdev);
4615                 drbd_rs_failed_io(mdev, sector, size);
4616                 return true;
4617         }
4618
4619         found = validate_req_change_req_state(mdev, p->block_id, sector,
4620                                               &mdev->write_requests, __func__,
4621                                               NEG_ACKED, missing_ok);
4622         if (!found) {
4623                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4624                    The master bio might already be completed, therefore the
4625                    request is no longer in the collision hash. */
4626                 /* In Protocol B we might already have got a P_RECV_ACK
4627                    but then get a P_NEG_ACK afterwards. */
4628                 if (!missing_ok)
4629                         return false;
4630                 drbd_set_out_of_sync(mdev, sector, size);
4631         }
4632         return true;
4633 }
4634
4635 static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
4636 {
4637         struct p_block_ack *p = mdev->tconn->meta.rbuf;
4638         sector_t sector = be64_to_cpu(p->sector);
4639
4640         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4641
4642         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4643             (unsigned long long)sector, be32_to_cpu(p->blksize));
4644
4645         return validate_req_change_req_state(mdev, p->block_id, sector,
4646                                              &mdev->read_requests, __func__,
4647                                              NEG_ACKED, false);
4648 }
4649
4650 static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
4651 {
4652         sector_t sector;
4653         int size;
4654         struct p_block_ack *p = mdev->tconn->meta.rbuf;
4655
4656         sector = be64_to_cpu(p->sector);
4657         size = be32_to_cpu(p->blksize);
4658
4659         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4660
4661         dec_rs_pending(mdev);
4662
4663         if (get_ldev_if_state(mdev, D_FAILED)) {
4664                 drbd_rs_complete_io(mdev, sector);
4665                 switch (cmd) {
4666                 case P_NEG_RS_DREPLY:
4667                         drbd_rs_failed_io(mdev, sector, size);
4668                 case P_RS_CANCEL:
4669                         break;
4670                 default:
4671                         D_ASSERT(0);
4672                         put_ldev(mdev);
4673                         return false;
4674                 }
4675                 put_ldev(mdev);
4676         }
4677
4678         return true;
4679 }
4680
4681 static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
4682 {
4683         struct p_barrier_ack *p = mdev->tconn->meta.rbuf;
4684
4685         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4686
4687         if (mdev->state.conn == C_AHEAD &&
4688             atomic_read(&mdev->ap_in_flight) == 0 &&
4689             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4690                 mdev->start_resync_timer.expires = jiffies + HZ;
4691                 add_timer(&mdev->start_resync_timer);
4692         }
4693
4694         return true;
4695 }
4696
4697 static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
4698 {
4699         struct p_block_ack *p = mdev->tconn->meta.rbuf;
4700         struct drbd_work *w;
4701         sector_t sector;
4702         int size;
4703
4704         sector = be64_to_cpu(p->sector);
4705         size = be32_to_cpu(p->blksize);
4706
4707         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4708
4709         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4710                 drbd_ov_out_of_sync_found(mdev, sector, size);
4711         else
4712                 ov_out_of_sync_print(mdev);
4713
4714         if (!get_ldev(mdev))
4715                 return true;
4716
4717         drbd_rs_complete_io(mdev, sector);
4718         dec_rs_pending(mdev);
4719
4720         --mdev->ov_left;
4721
4722         /* let's advance progress step marks only for every other megabyte */
4723         if ((mdev->ov_left & 0x200) == 0x200)
4724                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4725
4726         if (mdev->ov_left == 0) {
4727                 w = kmalloc(sizeof(*w), GFP_NOIO);
4728                 if (w) {
4729                         w->cb = w_ov_finished;
4730                         w->mdev = mdev;
4731                         drbd_queue_work_front(&mdev->tconn->data.work, w);
4732                 } else {
4733                         dev_err(DEV, "kmalloc(w) failed.");
4734                         ov_out_of_sync_print(mdev);
4735                         drbd_resync_finished(mdev);
4736                 }
4737         }
4738         put_ldev(mdev);
4739         return true;
4740 }
4741
4742 static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
4743 {
4744         return true;
4745 }
4746
4747 static int tconn_process_done_ee(struct drbd_tconn *tconn)
4748 {
4749         struct drbd_conf *mdev;
4750         int i, not_empty = 0;
4751
4752         do {
4753                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4754                 flush_signals(current);
4755                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4756                         if (drbd_process_done_ee(mdev))
4757                                 return 1; /* error */
4758                 }
4759                 set_bit(SIGNAL_ASENDER, &tconn->flags);
4760
4761                 spin_lock_irq(&tconn->req_lock);
4762                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4763                         not_empty = !list_empty(&mdev->done_ee);
4764                         if (not_empty)
4765                                 break;
4766                 }
4767                 spin_unlock_irq(&tconn->req_lock);
4768         } while (not_empty);
4769
4770         return 0;
4771 }
4772
4773 struct asender_cmd {
4774         size_t pkt_size;
4775         enum mdev_or_conn fa_type; /* first argument's type */
4776         union {
4777                 int (*mdev_fn)(struct drbd_conf *mdev, enum drbd_packet cmd);
4778                 int (*conn_fn)(struct drbd_tconn *tconn, enum drbd_packet cmd);
4779         };
4780 };
4781
4782 static struct asender_cmd asender_tbl[] = {
4783         [P_PING]            = { sizeof(struct p_header), CONN, { .conn_fn = got_Ping } },
4784         [P_PING_ACK]        = { sizeof(struct p_header), CONN, { .conn_fn = got_PingAck } },
4785         [P_RECV_ACK]        = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4786         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4787         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4788         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4789         [P_NEG_ACK]         = { sizeof(struct p_block_ack), MDEV, { got_NegAck } },
4790         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), MDEV, { got_NegDReply } },
4791         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), MDEV, { got_NegRSDReply } },
4792         [P_OV_RESULT]       = { sizeof(struct p_block_ack), MDEV, { got_OVResult } },
4793         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), MDEV, { got_BarrierAck } },
4794         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), MDEV, { got_RqSReply } },
4795         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), MDEV, { got_IsInSync } },
4796         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), MDEV, { got_skip } },
4797         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), MDEV, { got_NegRSDReply } },
4798         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), CONN, {.conn_fn = got_conn_RqSReply}},
4799         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4800 };
4801
4802 int drbd_asender(struct drbd_thread *thi)
4803 {
4804         struct drbd_tconn *tconn = thi->tconn;
4805         struct p_header *h = tconn->meta.rbuf;
4806         struct asender_cmd *cmd = NULL;
4807         struct packet_info pi;
4808         int rv;
4809         void *buf    = h;
4810         int received = 0;
4811         int expect   = sizeof(struct p_header);
4812         int ping_timeout_active = 0;
4813
4814         current->policy = SCHED_RR;  /* Make this a realtime task! */
4815         current->rt_priority = 2;    /* more important than all other tasks */
4816
4817         while (get_t_state(thi) == RUNNING) {
4818                 drbd_thread_current_set_cpu(thi);
4819                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
4820                         if (!drbd_send_ping(tconn)) {
4821                                 conn_err(tconn, "drbd_send_ping has failed\n");
4822                                 goto reconnect;
4823                         }
4824                         tconn->meta.socket->sk->sk_rcvtimeo =
4825                                 tconn->net_conf->ping_timeo*HZ/10;
4826                         ping_timeout_active = 1;
4827                 }
4828
4829                 /* TODO: conditionally cork; it may hurt latency if we cork without
4830                    much to send */
4831                 if (!tconn->net_conf->no_cork)
4832                         drbd_tcp_cork(tconn->meta.socket);
4833                 if (tconn_process_done_ee(tconn)) {
4834                         conn_err(tconn, "tconn_process_done_ee() failed\n");
4835                         goto reconnect;
4836                 }
4837                 /* but unconditionally uncork unless disabled */
4838                 if (!tconn->net_conf->no_cork)
4839                         drbd_tcp_uncork(tconn->meta.socket);
4840
4841                 /* short circuit, recv_msg would return EINTR anyways. */
4842                 if (signal_pending(current))
4843                         continue;
4844
4845                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4846                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4847
4848                 flush_signals(current);
4849
4850                 /* Note:
4851                  * -EINTR        (on meta) we got a signal
4852                  * -EAGAIN       (on meta) rcvtimeo expired
4853                  * -ECONNRESET   other side closed the connection
4854                  * -ERESTARTSYS  (on data) we got a signal
4855                  * rv <  0       other than above: unexpected error!
4856                  * rv == expected: full header or command
4857                  * rv <  expected: "woken" by signal during receive
4858                  * rv == 0       : "connection shut down by peer"
4859                  */
4860                 if (likely(rv > 0)) {
4861                         received += rv;
4862                         buf      += rv;
4863                 } else if (rv == 0) {
4864                         conn_err(tconn, "meta connection shut down by peer.\n");
4865                         goto reconnect;
4866                 } else if (rv == -EAGAIN) {
4867                         /* If the data socket received something meanwhile,
4868                          * that is good enough: peer is still alive. */
4869                         if (time_after(tconn->last_received,
4870                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
4871                                 continue;
4872                         if (ping_timeout_active) {
4873                                 conn_err(tconn, "PingAck did not arrive in time.\n");
4874                                 goto reconnect;
4875                         }
4876                         set_bit(SEND_PING, &tconn->flags);
4877                         continue;
4878                 } else if (rv == -EINTR) {
4879                         continue;
4880                 } else {
4881                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
4882                         goto reconnect;
4883                 }
4884
4885                 if (received == expect && cmd == NULL) {
4886                         if (decode_header(tconn, h, &pi))
4887                                 goto reconnect;
4888                         cmd = &asender_tbl[pi.cmd];
4889                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd) {
4890                                 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
4891                                         pi.cmd, pi.size);
4892                                 goto disconnect;
4893                         }
4894                         expect = cmd->pkt_size;
4895                         if (pi.size != expect - sizeof(struct p_header)) {
4896                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
4897                                         pi.cmd, pi.size);
4898                                 goto reconnect;
4899                         }
4900                 }
4901                 if (received == expect) {
4902                         bool rv;
4903
4904                         if (cmd->fa_type == CONN) {
4905                                 rv = cmd->conn_fn(tconn, pi.cmd);
4906                         } else {
4907                                 struct drbd_conf *mdev = vnr_to_mdev(tconn, pi.vnr);
4908                                 rv = cmd->mdev_fn(mdev, pi.cmd);
4909                         }
4910
4911                         if (!rv)
4912                                 goto reconnect;
4913
4914                         tconn->last_received = jiffies;
4915
4916                         /* the idle_timeout (ping-int)
4917                          * has been restored in got_PingAck() */
4918                         if (cmd == &asender_tbl[P_PING_ACK])
4919                                 ping_timeout_active = 0;
4920
4921                         buf      = h;
4922                         received = 0;
4923                         expect   = sizeof(struct p_header);
4924                         cmd      = NULL;
4925                 }
4926         }
4927
4928         if (0) {
4929 reconnect:
4930                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4931         }
4932         if (0) {
4933 disconnect:
4934                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4935         }
4936         clear_bit(SIGNAL_ASENDER, &tconn->flags);
4937
4938         conn_info(tconn, "asender terminated\n");
4939
4940         return 0;
4941 }