]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
c456a141eeec5c19c53a3d21dfa4e8047d10aff0
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55 };
56
57 enum finish_epoch {
58         FE_STILL_LIVE,
59         FE_DESTROYED,
60         FE_RECYCLED,
61 };
62
63 static int drbd_do_features(struct drbd_tconn *tconn);
64 static int drbd_do_auth(struct drbd_tconn *tconn);
65 static int drbd_disconnected(int vnr, void *p, void *data);
66
67 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68 static int e_end_block(struct drbd_work *, int);
69
70
71 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
72
73 /*
74  * some helper functions to deal with single linked page lists,
75  * page->private being our "next" pointer.
76  */
77
78 /* If at least n pages are linked at head, get n pages off.
79  * Otherwise, don't modify head, and return NULL.
80  * Locking is the responsibility of the caller.
81  */
82 static struct page *page_chain_del(struct page **head, int n)
83 {
84         struct page *page;
85         struct page *tmp;
86
87         BUG_ON(!n);
88         BUG_ON(!head);
89
90         page = *head;
91
92         if (!page)
93                 return NULL;
94
95         while (page) {
96                 tmp = page_chain_next(page);
97                 if (--n == 0)
98                         break; /* found sufficient pages */
99                 if (tmp == NULL)
100                         /* insufficient pages, don't use any of them. */
101                         return NULL;
102                 page = tmp;
103         }
104
105         /* add end of list marker for the returned list */
106         set_page_private(page, 0);
107         /* actual return value, and adjustment of head */
108         page = *head;
109         *head = tmp;
110         return page;
111 }
112
113 /* may be used outside of locks to find the tail of a (usually short)
114  * "private" page chain, before adding it back to a global chain head
115  * with page_chain_add() under a spinlock. */
116 static struct page *page_chain_tail(struct page *page, int *len)
117 {
118         struct page *tmp;
119         int i = 1;
120         while ((tmp = page_chain_next(page)))
121                 ++i, page = tmp;
122         if (len)
123                 *len = i;
124         return page;
125 }
126
127 static int page_chain_free(struct page *page)
128 {
129         struct page *tmp;
130         int i = 0;
131         page_chain_for_each_safe(page, tmp) {
132                 put_page(page);
133                 ++i;
134         }
135         return i;
136 }
137
138 static void page_chain_add(struct page **head,
139                 struct page *chain_first, struct page *chain_last)
140 {
141 #if 1
142         struct page *tmp;
143         tmp = page_chain_tail(chain_first, NULL);
144         BUG_ON(tmp != chain_last);
145 #endif
146
147         /* add chain to head */
148         set_page_private(chain_last, (unsigned long)*head);
149         *head = chain_first;
150 }
151
152 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
153 {
154         struct page *page = NULL;
155         struct page *tmp = NULL;
156         int i = 0;
157
158         /* Yes, testing drbd_pp_vacant outside the lock is racy.
159          * So what. It saves a spin_lock. */
160         if (drbd_pp_vacant >= number) {
161                 spin_lock(&drbd_pp_lock);
162                 page = page_chain_del(&drbd_pp_pool, number);
163                 if (page)
164                         drbd_pp_vacant -= number;
165                 spin_unlock(&drbd_pp_lock);
166                 if (page)
167                         return page;
168         }
169
170         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
171          * "criss-cross" setup, that might cause write-out on some other DRBD,
172          * which in turn might block on the other node at this very place.  */
173         for (i = 0; i < number; i++) {
174                 tmp = alloc_page(GFP_TRY);
175                 if (!tmp)
176                         break;
177                 set_page_private(tmp, (unsigned long)page);
178                 page = tmp;
179         }
180
181         if (i == number)
182                 return page;
183
184         /* Not enough pages immediately available this time.
185          * No need to jump around here, drbd_pp_alloc will retry this
186          * function "soon". */
187         if (page) {
188                 tmp = page_chain_tail(page, NULL);
189                 spin_lock(&drbd_pp_lock);
190                 page_chain_add(&drbd_pp_pool, page, tmp);
191                 drbd_pp_vacant += i;
192                 spin_unlock(&drbd_pp_lock);
193         }
194         return NULL;
195 }
196
197 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
198 {
199         struct drbd_peer_request *peer_req;
200         struct list_head *le, *tle;
201
202         /* The EEs are always appended to the end of the list. Since
203            they are sent in order over the wire, they have to finish
204            in order. As soon as we see the first not finished we can
205            stop to examine the list... */
206
207         list_for_each_safe(le, tle, &mdev->net_ee) {
208                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
209                 if (drbd_ee_has_active_page(peer_req))
210                         break;
211                 list_move(le, to_be_freed);
212         }
213 }
214
215 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
216 {
217         LIST_HEAD(reclaimed);
218         struct drbd_peer_request *peer_req, *t;
219
220         spin_lock_irq(&mdev->tconn->req_lock);
221         reclaim_net_ee(mdev, &reclaimed);
222         spin_unlock_irq(&mdev->tconn->req_lock);
223
224         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
225                 drbd_free_net_ee(mdev, peer_req);
226 }
227
228 /**
229  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
230  * @mdev:       DRBD device.
231  * @number:     number of pages requested
232  * @retry:      whether to retry, if not enough pages are available right now
233  *
234  * Tries to allocate number pages, first from our own page pool, then from
235  * the kernel, unless this allocation would exceed the max_buffers setting.
236  * Possibly retry until DRBD frees sufficient pages somewhere else.
237  *
238  * Returns a page chain linked via page->private.
239  */
240 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
241 {
242         struct page *page = NULL;
243         DEFINE_WAIT(wait);
244
245         /* Yes, we may run up to @number over max_buffers. If we
246          * follow it strictly, the admin will get it wrong anyways. */
247         if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
248                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
249
250         while (page == NULL) {
251                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
252
253                 drbd_kick_lo_and_reclaim_net(mdev);
254
255                 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
256                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
257                         if (page)
258                                 break;
259                 }
260
261                 if (!retry)
262                         break;
263
264                 if (signal_pending(current)) {
265                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
266                         break;
267                 }
268
269                 schedule();
270         }
271         finish_wait(&drbd_pp_wait, &wait);
272
273         if (page)
274                 atomic_add(number, &mdev->pp_in_use);
275         return page;
276 }
277
278 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
279  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
280  * Either links the page chain back to the global pool,
281  * or returns all pages to the system. */
282 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
283 {
284         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
285         int i;
286
287         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
288                 i = page_chain_free(page);
289         else {
290                 struct page *tmp;
291                 tmp = page_chain_tail(page, &i);
292                 spin_lock(&drbd_pp_lock);
293                 page_chain_add(&drbd_pp_pool, page, tmp);
294                 drbd_pp_vacant += i;
295                 spin_unlock(&drbd_pp_lock);
296         }
297         i = atomic_sub_return(i, a);
298         if (i < 0)
299                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
300                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
301         wake_up(&drbd_pp_wait);
302 }
303
304 /*
305 You need to hold the req_lock:
306  _drbd_wait_ee_list_empty()
307
308 You must not have the req_lock:
309  drbd_free_ee()
310  drbd_alloc_ee()
311  drbd_init_ee()
312  drbd_release_ee()
313  drbd_ee_fix_bhs()
314  drbd_process_done_ee()
315  drbd_clear_done_ee()
316  drbd_wait_ee_list_empty()
317 */
318
319 struct drbd_peer_request *
320 drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
321               unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
322 {
323         struct drbd_peer_request *peer_req;
324         struct page *page;
325         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
326
327         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
328                 return NULL;
329
330         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
331         if (!peer_req) {
332                 if (!(gfp_mask & __GFP_NOWARN))
333                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
334                 return NULL;
335         }
336
337         page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
338         if (!page)
339                 goto fail;
340
341         drbd_clear_interval(&peer_req->i);
342         peer_req->i.size = data_size;
343         peer_req->i.sector = sector;
344         peer_req->i.local = false;
345         peer_req->i.waiting = false;
346
347         peer_req->epoch = NULL;
348         peer_req->w.mdev = mdev;
349         peer_req->pages = page;
350         atomic_set(&peer_req->pending_bios, 0);
351         peer_req->flags = 0;
352         /*
353          * The block_id is opaque to the receiver.  It is not endianness
354          * converted, and sent back to the sender unchanged.
355          */
356         peer_req->block_id = id;
357
358         return peer_req;
359
360  fail:
361         mempool_free(peer_req, drbd_ee_mempool);
362         return NULL;
363 }
364
365 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
366                        int is_net)
367 {
368         if (peer_req->flags & EE_HAS_DIGEST)
369                 kfree(peer_req->digest);
370         drbd_pp_free(mdev, peer_req->pages, is_net);
371         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
372         D_ASSERT(drbd_interval_empty(&peer_req->i));
373         mempool_free(peer_req, drbd_ee_mempool);
374 }
375
376 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
377 {
378         LIST_HEAD(work_list);
379         struct drbd_peer_request *peer_req, *t;
380         int count = 0;
381         int is_net = list == &mdev->net_ee;
382
383         spin_lock_irq(&mdev->tconn->req_lock);
384         list_splice_init(list, &work_list);
385         spin_unlock_irq(&mdev->tconn->req_lock);
386
387         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
388                 drbd_free_some_ee(mdev, peer_req, is_net);
389                 count++;
390         }
391         return count;
392 }
393
394
395 /* See also comments in _req_mod(,BARRIER_ACKED)
396  * and receive_Barrier.
397  *
398  * Move entries from net_ee to done_ee, if ready.
399  * Grab done_ee, call all callbacks, free the entries.
400  * The callbacks typically send out ACKs.
401  */
402 static int drbd_process_done_ee(struct drbd_conf *mdev)
403 {
404         LIST_HEAD(work_list);
405         LIST_HEAD(reclaimed);
406         struct drbd_peer_request *peer_req, *t;
407         int err = 0;
408
409         spin_lock_irq(&mdev->tconn->req_lock);
410         reclaim_net_ee(mdev, &reclaimed);
411         list_splice_init(&mdev->done_ee, &work_list);
412         spin_unlock_irq(&mdev->tconn->req_lock);
413
414         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
415                 drbd_free_net_ee(mdev, peer_req);
416
417         /* possible callbacks here:
418          * e_end_block, and e_end_resync_block, e_send_discard_write.
419          * all ignore the last argument.
420          */
421         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
422                 int err2;
423
424                 /* list_del not necessary, next/prev members not touched */
425                 err2 = peer_req->w.cb(&peer_req->w, !!err);
426                 if (!err)
427                         err = err2;
428                 drbd_free_ee(mdev, peer_req);
429         }
430         wake_up(&mdev->ee_wait);
431
432         return err;
433 }
434
435 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
436 {
437         DEFINE_WAIT(wait);
438
439         /* avoids spin_lock/unlock
440          * and calling prepare_to_wait in the fast path */
441         while (!list_empty(head)) {
442                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
443                 spin_unlock_irq(&mdev->tconn->req_lock);
444                 io_schedule();
445                 finish_wait(&mdev->ee_wait, &wait);
446                 spin_lock_irq(&mdev->tconn->req_lock);
447         }
448 }
449
450 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
451 {
452         spin_lock_irq(&mdev->tconn->req_lock);
453         _drbd_wait_ee_list_empty(mdev, head);
454         spin_unlock_irq(&mdev->tconn->req_lock);
455 }
456
457 /* see also kernel_accept; which is only present since 2.6.18.
458  * also we want to log which part of it failed, exactly */
459 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
460 {
461         struct sock *sk = sock->sk;
462         int err = 0;
463
464         *what = "listen";
465         err = sock->ops->listen(sock, 5);
466         if (err < 0)
467                 goto out;
468
469         *what = "sock_create_lite";
470         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
471                                newsock);
472         if (err < 0)
473                 goto out;
474
475         *what = "accept";
476         err = sock->ops->accept(sock, *newsock, 0);
477         if (err < 0) {
478                 sock_release(*newsock);
479                 *newsock = NULL;
480                 goto out;
481         }
482         (*newsock)->ops  = sock->ops;
483
484 out:
485         return err;
486 }
487
488 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
489 {
490         mm_segment_t oldfs;
491         struct kvec iov = {
492                 .iov_base = buf,
493                 .iov_len = size,
494         };
495         struct msghdr msg = {
496                 .msg_iovlen = 1,
497                 .msg_iov = (struct iovec *)&iov,
498                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
499         };
500         int rv;
501
502         oldfs = get_fs();
503         set_fs(KERNEL_DS);
504         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
505         set_fs(oldfs);
506
507         return rv;
508 }
509
510 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
511 {
512         mm_segment_t oldfs;
513         struct kvec iov = {
514                 .iov_base = buf,
515                 .iov_len = size,
516         };
517         struct msghdr msg = {
518                 .msg_iovlen = 1,
519                 .msg_iov = (struct iovec *)&iov,
520                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
521         };
522         int rv;
523
524         oldfs = get_fs();
525         set_fs(KERNEL_DS);
526
527         for (;;) {
528                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
529                 if (rv == size)
530                         break;
531
532                 /* Note:
533                  * ECONNRESET   other side closed the connection
534                  * ERESTARTSYS  (on  sock) we got a signal
535                  */
536
537                 if (rv < 0) {
538                         if (rv == -ECONNRESET)
539                                 conn_info(tconn, "sock was reset by peer\n");
540                         else if (rv != -ERESTARTSYS)
541                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
542                         break;
543                 } else if (rv == 0) {
544                         conn_info(tconn, "sock was shut down by peer\n");
545                         break;
546                 } else  {
547                         /* signal came in, or peer/link went down,
548                          * after we read a partial message
549                          */
550                         /* D_ASSERT(signal_pending(current)); */
551                         break;
552                 }
553         };
554
555         set_fs(oldfs);
556
557         if (rv != size)
558                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
559
560         return rv;
561 }
562
563 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
564 {
565         int err;
566
567         err = drbd_recv(tconn, buf, size);
568         if (err != size) {
569                 if (err >= 0)
570                         err = -EIO;
571         } else
572                 err = 0;
573         return err;
574 }
575
576 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
577 {
578         int err;
579
580         err = drbd_recv_all(tconn, buf, size);
581         if (err && !signal_pending(current))
582                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
583         return err;
584 }
585
586 /* quoting tcp(7):
587  *   On individual connections, the socket buffer size must be set prior to the
588  *   listen(2) or connect(2) calls in order to have it take effect.
589  * This is our wrapper to do so.
590  */
591 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
592                 unsigned int rcv)
593 {
594         /* open coded SO_SNDBUF, SO_RCVBUF */
595         if (snd) {
596                 sock->sk->sk_sndbuf = snd;
597                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
598         }
599         if (rcv) {
600                 sock->sk->sk_rcvbuf = rcv;
601                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
602         }
603 }
604
605 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
606 {
607         const char *what;
608         struct socket *sock;
609         struct sockaddr_in6 src_in6;
610         int err;
611         int disconnect_on_error = 1;
612
613         if (!get_net_conf(tconn))
614                 return NULL;
615
616         what = "sock_create_kern";
617         err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
618                 SOCK_STREAM, IPPROTO_TCP, &sock);
619         if (err < 0) {
620                 sock = NULL;
621                 goto out;
622         }
623
624         sock->sk->sk_rcvtimeo =
625         sock->sk->sk_sndtimeo =  tconn->net_conf->try_connect_int*HZ;
626         drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
627                         tconn->net_conf->rcvbuf_size);
628
629        /* explicitly bind to the configured IP as source IP
630         *  for the outgoing connections.
631         *  This is needed for multihomed hosts and to be
632         *  able to use lo: interfaces for drbd.
633         * Make sure to use 0 as port number, so linux selects
634         *  a free one dynamically.
635         */
636         memcpy(&src_in6, tconn->net_conf->my_addr,
637                min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
638         if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
639                 src_in6.sin6_port = 0;
640         else
641                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
642
643         what = "bind before connect";
644         err = sock->ops->bind(sock,
645                               (struct sockaddr *) &src_in6,
646                               tconn->net_conf->my_addr_len);
647         if (err < 0)
648                 goto out;
649
650         /* connect may fail, peer not yet available.
651          * stay C_WF_CONNECTION, don't go Disconnecting! */
652         disconnect_on_error = 0;
653         what = "connect";
654         err = sock->ops->connect(sock,
655                                  (struct sockaddr *)tconn->net_conf->peer_addr,
656                                  tconn->net_conf->peer_addr_len, 0);
657
658 out:
659         if (err < 0) {
660                 if (sock) {
661                         sock_release(sock);
662                         sock = NULL;
663                 }
664                 switch (-err) {
665                         /* timeout, busy, signal pending */
666                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
667                 case EINTR: case ERESTARTSYS:
668                         /* peer not (yet) available, network problem */
669                 case ECONNREFUSED: case ENETUNREACH:
670                 case EHOSTDOWN:    case EHOSTUNREACH:
671                         disconnect_on_error = 0;
672                         break;
673                 default:
674                         conn_err(tconn, "%s failed, err = %d\n", what, err);
675                 }
676                 if (disconnect_on_error)
677                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
678         }
679         put_net_conf(tconn);
680         return sock;
681 }
682
683 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
684 {
685         int timeo, err;
686         struct socket *s_estab = NULL, *s_listen;
687         const char *what;
688
689         if (!get_net_conf(tconn))
690                 return NULL;
691
692         what = "sock_create_kern";
693         err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
694                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
695         if (err) {
696                 s_listen = NULL;
697                 goto out;
698         }
699
700         timeo = tconn->net_conf->try_connect_int * HZ;
701         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
702
703         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
704         s_listen->sk->sk_rcvtimeo = timeo;
705         s_listen->sk->sk_sndtimeo = timeo;
706         drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
707                         tconn->net_conf->rcvbuf_size);
708
709         what = "bind before listen";
710         err = s_listen->ops->bind(s_listen,
711                               (struct sockaddr *) tconn->net_conf->my_addr,
712                               tconn->net_conf->my_addr_len);
713         if (err < 0)
714                 goto out;
715
716         err = drbd_accept(&what, s_listen, &s_estab);
717
718 out:
719         if (s_listen)
720                 sock_release(s_listen);
721         if (err < 0) {
722                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
723                         conn_err(tconn, "%s failed, err = %d\n", what, err);
724                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
725                 }
726         }
727         put_net_conf(tconn);
728
729         return s_estab;
730 }
731
732 static int drbd_send_fp(struct drbd_tconn *tconn, struct drbd_socket *sock, enum drbd_packet cmd)
733 {
734         struct p_header *h = tconn->data.sbuf;
735
736         return !_conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
737 }
738
739 static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
740 {
741         struct p_header80 h;
742         int rr;
743
744         rr = drbd_recv_short(sock, &h, sizeof(h), 0);
745
746         if (rr == sizeof(h) && h.magic == cpu_to_be32(DRBD_MAGIC))
747                 return be16_to_cpu(h.command);
748
749         return 0xffff;
750 }
751
752 /**
753  * drbd_socket_okay() - Free the socket if its connection is not okay
754  * @sock:       pointer to the pointer to the socket.
755  */
756 static int drbd_socket_okay(struct socket **sock)
757 {
758         int rr;
759         char tb[4];
760
761         if (!*sock)
762                 return false;
763
764         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
765
766         if (rr > 0 || rr == -EAGAIN) {
767                 return true;
768         } else {
769                 sock_release(*sock);
770                 *sock = NULL;
771                 return false;
772         }
773 }
774 /* Gets called if a connection is established, or if a new minor gets created
775    in a connection */
776 int drbd_connected(int vnr, void *p, void *data)
777 {
778         struct drbd_conf *mdev = (struct drbd_conf *)p;
779         int err;
780
781         atomic_set(&mdev->packet_seq, 0);
782         mdev->peer_seq = 0;
783
784         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
785                 &mdev->tconn->cstate_mutex :
786                 &mdev->own_state_mutex;
787
788         err = drbd_send_sync_param(mdev);
789         if (!err)
790                 err = drbd_send_sizes(mdev, 0, 0);
791         if (!err)
792                 err = drbd_send_uuids(mdev);
793         if (!err)
794                 err = drbd_send_state(mdev);
795         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
796         clear_bit(RESIZE_PENDING, &mdev->flags);
797         return err;
798 }
799
800 /*
801  * return values:
802  *   1 yes, we have a valid connection
803  *   0 oops, did not work out, please try again
804  *  -1 peer talks different language,
805  *     no point in trying again, please go standalone.
806  *  -2 We do not have a network config...
807  */
808 static int drbd_connect(struct drbd_tconn *tconn)
809 {
810         struct socket *sock, *msock;
811         int try, h, ok;
812
813         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
814                 return -2;
815
816         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
817
818         /* Assume that the peer only understands protocol 80 until we know better.  */
819         tconn->agreed_pro_version = 80;
820
821         do {
822                 struct socket *s;
823
824                 for (try = 0;;) {
825                         /* 3 tries, this should take less than a second! */
826                         s = drbd_try_connect(tconn);
827                         if (s || ++try >= 3)
828                                 break;
829                         /* give the other side time to call bind() & listen() */
830                         schedule_timeout_interruptible(HZ / 10);
831                 }
832
833                 if (s) {
834                         if (!tconn->data.socket) {
835                                 tconn->data.socket = s;
836                                 drbd_send_fp(tconn, &tconn->data, P_INITIAL_DATA);
837                         } else if (!tconn->meta.socket) {
838                                 tconn->meta.socket = s;
839                                 drbd_send_fp(tconn, &tconn->meta, P_INITIAL_META);
840                         } else {
841                                 conn_err(tconn, "Logic error in drbd_connect()\n");
842                                 goto out_release_sockets;
843                         }
844                 }
845
846                 if (tconn->data.socket && tconn->meta.socket) {
847                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
848                         ok = drbd_socket_okay(&tconn->data.socket);
849                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
850                         if (ok)
851                                 break;
852                 }
853
854 retry:
855                 s = drbd_wait_for_connect(tconn);
856                 if (s) {
857                         try = drbd_recv_fp(tconn, s);
858                         drbd_socket_okay(&tconn->data.socket);
859                         drbd_socket_okay(&tconn->meta.socket);
860                         switch (try) {
861                         case P_INITIAL_DATA:
862                                 if (tconn->data.socket) {
863                                         conn_warn(tconn, "initial packet S crossed\n");
864                                         sock_release(tconn->data.socket);
865                                 }
866                                 tconn->data.socket = s;
867                                 break;
868                         case P_INITIAL_META:
869                                 if (tconn->meta.socket) {
870                                         conn_warn(tconn, "initial packet M crossed\n");
871                                         sock_release(tconn->meta.socket);
872                                 }
873                                 tconn->meta.socket = s;
874                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
875                                 break;
876                         default:
877                                 conn_warn(tconn, "Error receiving initial packet\n");
878                                 sock_release(s);
879                                 if (random32() & 1)
880                                         goto retry;
881                         }
882                 }
883
884                 if (tconn->cstate <= C_DISCONNECTING)
885                         goto out_release_sockets;
886                 if (signal_pending(current)) {
887                         flush_signals(current);
888                         smp_rmb();
889                         if (get_t_state(&tconn->receiver) == EXITING)
890                                 goto out_release_sockets;
891                 }
892
893                 if (tconn->data.socket && &tconn->meta.socket) {
894                         ok = drbd_socket_okay(&tconn->data.socket);
895                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
896                         if (ok)
897                                 break;
898                 }
899         } while (1);
900
901         sock  = tconn->data.socket;
902         msock = tconn->meta.socket;
903
904         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
905         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
906
907         sock->sk->sk_allocation = GFP_NOIO;
908         msock->sk->sk_allocation = GFP_NOIO;
909
910         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
911         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
912
913         /* NOT YET ...
914          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
915          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
916          * first set it to the P_CONNECTION_FEATURES timeout,
917          * which we set to 4x the configured ping_timeout. */
918         sock->sk->sk_sndtimeo =
919         sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
920
921         msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
922         msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
923
924         /* we don't want delays.
925          * we use TCP_CORK where appropriate, though */
926         drbd_tcp_nodelay(sock);
927         drbd_tcp_nodelay(msock);
928
929         tconn->last_received = jiffies;
930
931         h = drbd_do_features(tconn);
932         if (h <= 0)
933                 return h;
934
935         if (tconn->cram_hmac_tfm) {
936                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
937                 switch (drbd_do_auth(tconn)) {
938                 case -1:
939                         conn_err(tconn, "Authentication of peer failed\n");
940                         return -1;
941                 case 0:
942                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
943                         return 0;
944                 }
945         }
946
947         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
948                 return 0;
949
950         sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
951         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
952
953         drbd_thread_start(&tconn->asender);
954
955         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
956                 return -1;
957
958         return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
959
960 out_release_sockets:
961         if (tconn->data.socket) {
962                 sock_release(tconn->data.socket);
963                 tconn->data.socket = NULL;
964         }
965         if (tconn->meta.socket) {
966                 sock_release(tconn->meta.socket);
967                 tconn->meta.socket = NULL;
968         }
969         return -1;
970 }
971
972 static int decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
973 {
974         if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
975                 pi->cmd = be16_to_cpu(h->h80.command);
976                 pi->size = be16_to_cpu(h->h80.length);
977                 pi->vnr = 0;
978         } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
979                 pi->cmd = be16_to_cpu(h->h95.command);
980                 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
981                 pi->vnr = 0;
982         } else {
983                 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
984                     be32_to_cpu(h->h80.magic),
985                     be16_to_cpu(h->h80.command),
986                     be16_to_cpu(h->h80.length));
987                 return -EINVAL;
988         }
989         return 0;
990 }
991
992 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
993 {
994         struct p_header *h = tconn->data.rbuf;
995         int err;
996
997         err = drbd_recv_all_warn(tconn, h, sizeof(*h));
998         if (err)
999                 return err;
1000
1001         err = decode_header(tconn, h, pi);
1002         tconn->last_received = jiffies;
1003
1004         return err;
1005 }
1006
1007 static void drbd_flush(struct drbd_conf *mdev)
1008 {
1009         int rv;
1010
1011         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1012                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1013                                         NULL);
1014                 if (rv) {
1015                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
1016                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1017                          * don't try again for ANY return value != 0
1018                          * if (rv == -EOPNOTSUPP) */
1019                         drbd_bump_write_ordering(mdev, WO_drain_io);
1020                 }
1021                 put_ldev(mdev);
1022         }
1023 }
1024
1025 /**
1026  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1027  * @mdev:       DRBD device.
1028  * @epoch:      Epoch object.
1029  * @ev:         Epoch event.
1030  */
1031 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1032                                                struct drbd_epoch *epoch,
1033                                                enum epoch_event ev)
1034 {
1035         int epoch_size;
1036         struct drbd_epoch *next_epoch;
1037         enum finish_epoch rv = FE_STILL_LIVE;
1038
1039         spin_lock(&mdev->epoch_lock);
1040         do {
1041                 next_epoch = NULL;
1042
1043                 epoch_size = atomic_read(&epoch->epoch_size);
1044
1045                 switch (ev & ~EV_CLEANUP) {
1046                 case EV_PUT:
1047                         atomic_dec(&epoch->active);
1048                         break;
1049                 case EV_GOT_BARRIER_NR:
1050                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1051                         break;
1052                 case EV_BECAME_LAST:
1053                         /* nothing to do*/
1054                         break;
1055                 }
1056
1057                 if (epoch_size != 0 &&
1058                     atomic_read(&epoch->active) == 0 &&
1059                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1060                         if (!(ev & EV_CLEANUP)) {
1061                                 spin_unlock(&mdev->epoch_lock);
1062                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1063                                 spin_lock(&mdev->epoch_lock);
1064                         }
1065                         dec_unacked(mdev);
1066
1067                         if (mdev->current_epoch != epoch) {
1068                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1069                                 list_del(&epoch->list);
1070                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1071                                 mdev->epochs--;
1072                                 kfree(epoch);
1073
1074                                 if (rv == FE_STILL_LIVE)
1075                                         rv = FE_DESTROYED;
1076                         } else {
1077                                 epoch->flags = 0;
1078                                 atomic_set(&epoch->epoch_size, 0);
1079                                 /* atomic_set(&epoch->active, 0); is already zero */
1080                                 if (rv == FE_STILL_LIVE)
1081                                         rv = FE_RECYCLED;
1082                                 wake_up(&mdev->ee_wait);
1083                         }
1084                 }
1085
1086                 if (!next_epoch)
1087                         break;
1088
1089                 epoch = next_epoch;
1090         } while (1);
1091
1092         spin_unlock(&mdev->epoch_lock);
1093
1094         return rv;
1095 }
1096
1097 /**
1098  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1099  * @mdev:       DRBD device.
1100  * @wo:         Write ordering method to try.
1101  */
1102 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1103 {
1104         enum write_ordering_e pwo;
1105         static char *write_ordering_str[] = {
1106                 [WO_none] = "none",
1107                 [WO_drain_io] = "drain",
1108                 [WO_bdev_flush] = "flush",
1109         };
1110
1111         pwo = mdev->write_ordering;
1112         wo = min(pwo, wo);
1113         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1114                 wo = WO_drain_io;
1115         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1116                 wo = WO_none;
1117         mdev->write_ordering = wo;
1118         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1119                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1120 }
1121
1122 /**
1123  * drbd_submit_peer_request()
1124  * @mdev:       DRBD device.
1125  * @peer_req:   peer request
1126  * @rw:         flag field, see bio->bi_rw
1127  *
1128  * May spread the pages to multiple bios,
1129  * depending on bio_add_page restrictions.
1130  *
1131  * Returns 0 if all bios have been submitted,
1132  * -ENOMEM if we could not allocate enough bios,
1133  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1134  *  single page to an empty bio (which should never happen and likely indicates
1135  *  that the lower level IO stack is in some way broken). This has been observed
1136  *  on certain Xen deployments.
1137  */
1138 /* TODO allocate from our own bio_set. */
1139 int drbd_submit_peer_request(struct drbd_conf *mdev,
1140                              struct drbd_peer_request *peer_req,
1141                              const unsigned rw, const int fault_type)
1142 {
1143         struct bio *bios = NULL;
1144         struct bio *bio;
1145         struct page *page = peer_req->pages;
1146         sector_t sector = peer_req->i.sector;
1147         unsigned ds = peer_req->i.size;
1148         unsigned n_bios = 0;
1149         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1150         int err = -ENOMEM;
1151
1152         /* In most cases, we will only need one bio.  But in case the lower
1153          * level restrictions happen to be different at this offset on this
1154          * side than those of the sending peer, we may need to submit the
1155          * request in more than one bio.
1156          *
1157          * Plain bio_alloc is good enough here, this is no DRBD internally
1158          * generated bio, but a bio allocated on behalf of the peer.
1159          */
1160 next_bio:
1161         bio = bio_alloc(GFP_NOIO, nr_pages);
1162         if (!bio) {
1163                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1164                 goto fail;
1165         }
1166         /* > peer_req->i.sector, unless this is the first bio */
1167         bio->bi_sector = sector;
1168         bio->bi_bdev = mdev->ldev->backing_bdev;
1169         bio->bi_rw = rw;
1170         bio->bi_private = peer_req;
1171         bio->bi_end_io = drbd_peer_request_endio;
1172
1173         bio->bi_next = bios;
1174         bios = bio;
1175         ++n_bios;
1176
1177         page_chain_for_each(page) {
1178                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1179                 if (!bio_add_page(bio, page, len, 0)) {
1180                         /* A single page must always be possible!
1181                          * But in case it fails anyways,
1182                          * we deal with it, and complain (below). */
1183                         if (bio->bi_vcnt == 0) {
1184                                 dev_err(DEV,
1185                                         "bio_add_page failed for len=%u, "
1186                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1187                                         len, (unsigned long long)bio->bi_sector);
1188                                 err = -ENOSPC;
1189                                 goto fail;
1190                         }
1191                         goto next_bio;
1192                 }
1193                 ds -= len;
1194                 sector += len >> 9;
1195                 --nr_pages;
1196         }
1197         D_ASSERT(page == NULL);
1198         D_ASSERT(ds == 0);
1199
1200         atomic_set(&peer_req->pending_bios, n_bios);
1201         do {
1202                 bio = bios;
1203                 bios = bios->bi_next;
1204                 bio->bi_next = NULL;
1205
1206                 drbd_generic_make_request(mdev, fault_type, bio);
1207         } while (bios);
1208         return 0;
1209
1210 fail:
1211         while (bios) {
1212                 bio = bios;
1213                 bios = bios->bi_next;
1214                 bio_put(bio);
1215         }
1216         return err;
1217 }
1218
1219 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1220                                              struct drbd_peer_request *peer_req)
1221 {
1222         struct drbd_interval *i = &peer_req->i;
1223
1224         drbd_remove_interval(&mdev->write_requests, i);
1225         drbd_clear_interval(i);
1226
1227         /* Wake up any processes waiting for this peer request to complete.  */
1228         if (i->waiting)
1229                 wake_up(&mdev->misc_wait);
1230 }
1231
1232 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1233 {
1234         struct drbd_conf *mdev;
1235         int rv;
1236         struct p_barrier *p = tconn->data.rbuf;
1237         struct drbd_epoch *epoch;
1238
1239         mdev = vnr_to_mdev(tconn, pi->vnr);
1240         if (!mdev)
1241                 return -EIO;
1242
1243         inc_unacked(mdev);
1244
1245         mdev->current_epoch->barrier_nr = p->barrier;
1246         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1247
1248         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1249          * the activity log, which means it would not be resynced in case the
1250          * R_PRIMARY crashes now.
1251          * Therefore we must send the barrier_ack after the barrier request was
1252          * completed. */
1253         switch (mdev->write_ordering) {
1254         case WO_none:
1255                 if (rv == FE_RECYCLED)
1256                         return 0;
1257
1258                 /* receiver context, in the writeout path of the other node.
1259                  * avoid potential distributed deadlock */
1260                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1261                 if (epoch)
1262                         break;
1263                 else
1264                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1265                         /* Fall through */
1266
1267         case WO_bdev_flush:
1268         case WO_drain_io:
1269                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1270                 drbd_flush(mdev);
1271
1272                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1273                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1274                         if (epoch)
1275                                 break;
1276                 }
1277
1278                 epoch = mdev->current_epoch;
1279                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1280
1281                 D_ASSERT(atomic_read(&epoch->active) == 0);
1282                 D_ASSERT(epoch->flags == 0);
1283
1284                 return 0;
1285         default:
1286                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1287                 return -EIO;
1288         }
1289
1290         epoch->flags = 0;
1291         atomic_set(&epoch->epoch_size, 0);
1292         atomic_set(&epoch->active, 0);
1293
1294         spin_lock(&mdev->epoch_lock);
1295         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1296                 list_add(&epoch->list, &mdev->current_epoch->list);
1297                 mdev->current_epoch = epoch;
1298                 mdev->epochs++;
1299         } else {
1300                 /* The current_epoch got recycled while we allocated this one... */
1301                 kfree(epoch);
1302         }
1303         spin_unlock(&mdev->epoch_lock);
1304
1305         return 0;
1306 }
1307
1308 /* used from receive_RSDataReply (recv_resync_read)
1309  * and from receive_Data */
1310 static struct drbd_peer_request *
1311 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1312               int data_size) __must_hold(local)
1313 {
1314         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1315         struct drbd_peer_request *peer_req;
1316         struct page *page;
1317         int dgs, ds, err;
1318         void *dig_in = mdev->tconn->int_dig_in;
1319         void *dig_vv = mdev->tconn->int_dig_vv;
1320         unsigned long *data;
1321
1322         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1323                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1324
1325         if (dgs) {
1326                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1327                 if (err)
1328                         return NULL;
1329         }
1330
1331         data_size -= dgs;
1332
1333         if (!expect(data_size != 0))
1334                 return NULL;
1335         if (!expect(IS_ALIGNED(data_size, 512)))
1336                 return NULL;
1337         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1338                 return NULL;
1339
1340         /* even though we trust out peer,
1341          * we sometimes have to double check. */
1342         if (sector + (data_size>>9) > capacity) {
1343                 dev_err(DEV, "request from peer beyond end of local disk: "
1344                         "capacity: %llus < sector: %llus + size: %u\n",
1345                         (unsigned long long)capacity,
1346                         (unsigned long long)sector, data_size);
1347                 return NULL;
1348         }
1349
1350         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1351          * "criss-cross" setup, that might cause write-out on some other DRBD,
1352          * which in turn might block on the other node at this very place.  */
1353         peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1354         if (!peer_req)
1355                 return NULL;
1356
1357         ds = data_size;
1358         page = peer_req->pages;
1359         page_chain_for_each(page) {
1360                 unsigned len = min_t(int, ds, PAGE_SIZE);
1361                 data = kmap(page);
1362                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1363                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1364                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1365                         data[0] = data[0] ^ (unsigned long)-1;
1366                 }
1367                 kunmap(page);
1368                 if (err) {
1369                         drbd_free_ee(mdev, peer_req);
1370                         return NULL;
1371                 }
1372                 ds -= len;
1373         }
1374
1375         if (dgs) {
1376                 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1377                 if (memcmp(dig_in, dig_vv, dgs)) {
1378                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1379                                 (unsigned long long)sector, data_size);
1380                         drbd_free_ee(mdev, peer_req);
1381                         return NULL;
1382                 }
1383         }
1384         mdev->recv_cnt += data_size>>9;
1385         return peer_req;
1386 }
1387
1388 /* drbd_drain_block() just takes a data block
1389  * out of the socket input buffer, and discards it.
1390  */
1391 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1392 {
1393         struct page *page;
1394         int err = 0;
1395         void *data;
1396
1397         if (!data_size)
1398                 return 0;
1399
1400         page = drbd_pp_alloc(mdev, 1, 1);
1401
1402         data = kmap(page);
1403         while (data_size) {
1404                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1405
1406                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1407                 if (err)
1408                         break;
1409                 data_size -= len;
1410         }
1411         kunmap(page);
1412         drbd_pp_free(mdev, page, 0);
1413         return err;
1414 }
1415
1416 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1417                            sector_t sector, int data_size)
1418 {
1419         struct bio_vec *bvec;
1420         struct bio *bio;
1421         int dgs, err, i, expect;
1422         void *dig_in = mdev->tconn->int_dig_in;
1423         void *dig_vv = mdev->tconn->int_dig_vv;
1424
1425         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1426                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1427
1428         if (dgs) {
1429                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1430                 if (err)
1431                         return err;
1432         }
1433
1434         data_size -= dgs;
1435
1436         /* optimistically update recv_cnt.  if receiving fails below,
1437          * we disconnect anyways, and counters will be reset. */
1438         mdev->recv_cnt += data_size>>9;
1439
1440         bio = req->master_bio;
1441         D_ASSERT(sector == bio->bi_sector);
1442
1443         bio_for_each_segment(bvec, bio, i) {
1444                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1445                 expect = min_t(int, data_size, bvec->bv_len);
1446                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1447                 kunmap(bvec->bv_page);
1448                 if (err)
1449                         return err;
1450                 data_size -= expect;
1451         }
1452
1453         if (dgs) {
1454                 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1455                 if (memcmp(dig_in, dig_vv, dgs)) {
1456                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1457                         return -EINVAL;
1458                 }
1459         }
1460
1461         D_ASSERT(data_size == 0);
1462         return 0;
1463 }
1464
1465 /* e_end_resync_block() is called via
1466  * drbd_process_done_ee() by asender only */
1467 static int e_end_resync_block(struct drbd_work *w, int unused)
1468 {
1469         struct drbd_peer_request *peer_req =
1470                 container_of(w, struct drbd_peer_request, w);
1471         struct drbd_conf *mdev = w->mdev;
1472         sector_t sector = peer_req->i.sector;
1473         int err;
1474
1475         D_ASSERT(drbd_interval_empty(&peer_req->i));
1476
1477         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1478                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1479                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1480         } else {
1481                 /* Record failure to sync */
1482                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1483
1484                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1485         }
1486         dec_unacked(mdev);
1487
1488         return err;
1489 }
1490
1491 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1492 {
1493         struct drbd_peer_request *peer_req;
1494
1495         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1496         if (!peer_req)
1497                 goto fail;
1498
1499         dec_rs_pending(mdev);
1500
1501         inc_unacked(mdev);
1502         /* corresponding dec_unacked() in e_end_resync_block()
1503          * respective _drbd_clear_done_ee */
1504
1505         peer_req->w.cb = e_end_resync_block;
1506
1507         spin_lock_irq(&mdev->tconn->req_lock);
1508         list_add(&peer_req->w.list, &mdev->sync_ee);
1509         spin_unlock_irq(&mdev->tconn->req_lock);
1510
1511         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1512         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1513                 return 0;
1514
1515         /* don't care for the reason here */
1516         dev_err(DEV, "submit failed, triggering re-connect\n");
1517         spin_lock_irq(&mdev->tconn->req_lock);
1518         list_del(&peer_req->w.list);
1519         spin_unlock_irq(&mdev->tconn->req_lock);
1520
1521         drbd_free_ee(mdev, peer_req);
1522 fail:
1523         put_ldev(mdev);
1524         return -EIO;
1525 }
1526
1527 static struct drbd_request *
1528 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1529              sector_t sector, bool missing_ok, const char *func)
1530 {
1531         struct drbd_request *req;
1532
1533         /* Request object according to our peer */
1534         req = (struct drbd_request *)(unsigned long)id;
1535         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1536                 return req;
1537         if (!missing_ok) {
1538                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1539                         (unsigned long)id, (unsigned long long)sector);
1540         }
1541         return NULL;
1542 }
1543
1544 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1545 {
1546         struct drbd_conf *mdev;
1547         struct drbd_request *req;
1548         sector_t sector;
1549         int err;
1550         struct p_data *p = tconn->data.rbuf;
1551
1552         mdev = vnr_to_mdev(tconn, pi->vnr);
1553         if (!mdev)
1554                 return -EIO;
1555
1556         sector = be64_to_cpu(p->sector);
1557
1558         spin_lock_irq(&mdev->tconn->req_lock);
1559         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1560         spin_unlock_irq(&mdev->tconn->req_lock);
1561         if (unlikely(!req))
1562                 return -EIO;
1563
1564         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1565          * special casing it there for the various failure cases.
1566          * still no race with drbd_fail_pending_reads */
1567         err = recv_dless_read(mdev, req, sector, pi->size);
1568         if (!err)
1569                 req_mod(req, DATA_RECEIVED);
1570         /* else: nothing. handled from drbd_disconnect...
1571          * I don't think we may complete this just yet
1572          * in case we are "on-disconnect: freeze" */
1573
1574         return err;
1575 }
1576
1577 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1578 {
1579         struct drbd_conf *mdev;
1580         sector_t sector;
1581         int err;
1582         struct p_data *p = tconn->data.rbuf;
1583
1584         mdev = vnr_to_mdev(tconn, pi->vnr);
1585         if (!mdev)
1586                 return -EIO;
1587
1588         sector = be64_to_cpu(p->sector);
1589         D_ASSERT(p->block_id == ID_SYNCER);
1590
1591         if (get_ldev(mdev)) {
1592                 /* data is submitted to disk within recv_resync_read.
1593                  * corresponding put_ldev done below on error,
1594                  * or in drbd_peer_request_endio. */
1595                 err = recv_resync_read(mdev, sector, pi->size);
1596         } else {
1597                 if (__ratelimit(&drbd_ratelimit_state))
1598                         dev_err(DEV, "Can not write resync data to local disk.\n");
1599
1600                 err = drbd_drain_block(mdev, pi->size);
1601
1602                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1603         }
1604
1605         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1606
1607         return err;
1608 }
1609
1610 static int w_restart_write(struct drbd_work *w, int cancel)
1611 {
1612         struct drbd_request *req = container_of(w, struct drbd_request, w);
1613         struct drbd_conf *mdev = w->mdev;
1614         struct bio *bio;
1615         unsigned long start_time;
1616         unsigned long flags;
1617
1618         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1619         if (!expect(req->rq_state & RQ_POSTPONED)) {
1620                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1621                 return -EIO;
1622         }
1623         bio = req->master_bio;
1624         start_time = req->start_time;
1625         /* Postponed requests will not have their master_bio completed!  */
1626         __req_mod(req, DISCARD_WRITE, NULL);
1627         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1628
1629         while (__drbd_make_request(mdev, bio, start_time))
1630                 /* retry */ ;
1631         return 0;
1632 }
1633
1634 static void restart_conflicting_writes(struct drbd_conf *mdev,
1635                                        sector_t sector, int size)
1636 {
1637         struct drbd_interval *i;
1638         struct drbd_request *req;
1639
1640         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1641                 if (!i->local)
1642                         continue;
1643                 req = container_of(i, struct drbd_request, i);
1644                 if (req->rq_state & RQ_LOCAL_PENDING ||
1645                     !(req->rq_state & RQ_POSTPONED))
1646                         continue;
1647                 if (expect(list_empty(&req->w.list))) {
1648                         req->w.mdev = mdev;
1649                         req->w.cb = w_restart_write;
1650                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1651                 }
1652         }
1653 }
1654
1655 /* e_end_block() is called via drbd_process_done_ee().
1656  * this means this function only runs in the asender thread
1657  */
1658 static int e_end_block(struct drbd_work *w, int cancel)
1659 {
1660         struct drbd_peer_request *peer_req =
1661                 container_of(w, struct drbd_peer_request, w);
1662         struct drbd_conf *mdev = w->mdev;
1663         sector_t sector = peer_req->i.sector;
1664         int err = 0, pcmd;
1665
1666         if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
1667                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1668                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1669                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1670                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1671                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1672                         err = drbd_send_ack(mdev, pcmd, peer_req);
1673                         if (pcmd == P_RS_WRITE_ACK)
1674                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1675                 } else {
1676                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1677                         /* we expect it to be marked out of sync anyways...
1678                          * maybe assert this?  */
1679                 }
1680                 dec_unacked(mdev);
1681         }
1682         /* we delete from the conflict detection hash _after_ we sent out the
1683          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1684         if (mdev->tconn->net_conf->two_primaries) {
1685                 spin_lock_irq(&mdev->tconn->req_lock);
1686                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1687                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1688                 if (peer_req->flags & EE_RESTART_REQUESTS)
1689                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1690                 spin_unlock_irq(&mdev->tconn->req_lock);
1691         } else
1692                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1693
1694         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1695
1696         return err;
1697 }
1698
1699 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1700 {
1701         struct drbd_conf *mdev = w->mdev;
1702         struct drbd_peer_request *peer_req =
1703                 container_of(w, struct drbd_peer_request, w);
1704         int err;
1705
1706         err = drbd_send_ack(mdev, ack, peer_req);
1707         dec_unacked(mdev);
1708
1709         return err;
1710 }
1711
1712 static int e_send_discard_write(struct drbd_work *w, int unused)
1713 {
1714         return e_send_ack(w, P_DISCARD_WRITE);
1715 }
1716
1717 static int e_send_retry_write(struct drbd_work *w, int unused)
1718 {
1719         struct drbd_tconn *tconn = w->mdev->tconn;
1720
1721         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1722                              P_RETRY_WRITE : P_DISCARD_WRITE);
1723 }
1724
1725 static bool seq_greater(u32 a, u32 b)
1726 {
1727         /*
1728          * We assume 32-bit wrap-around here.
1729          * For 24-bit wrap-around, we would have to shift:
1730          *  a <<= 8; b <<= 8;
1731          */
1732         return (s32)a - (s32)b > 0;
1733 }
1734
1735 static u32 seq_max(u32 a, u32 b)
1736 {
1737         return seq_greater(a, b) ? a : b;
1738 }
1739
1740 static bool need_peer_seq(struct drbd_conf *mdev)
1741 {
1742         struct drbd_tconn *tconn = mdev->tconn;
1743
1744         /*
1745          * We only need to keep track of the last packet_seq number of our peer
1746          * if we are in dual-primary mode and we have the discard flag set; see
1747          * handle_write_conflicts().
1748          */
1749         return tconn->net_conf->two_primaries &&
1750                test_bit(DISCARD_CONCURRENT, &tconn->flags);
1751 }
1752
1753 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1754 {
1755         unsigned int newest_peer_seq;
1756
1757         if (need_peer_seq(mdev)) {
1758                 spin_lock(&mdev->peer_seq_lock);
1759                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1760                 mdev->peer_seq = newest_peer_seq;
1761                 spin_unlock(&mdev->peer_seq_lock);
1762                 /* wake up only if we actually changed mdev->peer_seq */
1763                 if (peer_seq == newest_peer_seq)
1764                         wake_up(&mdev->seq_wait);
1765         }
1766 }
1767
1768 /* Called from receive_Data.
1769  * Synchronize packets on sock with packets on msock.
1770  *
1771  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1772  * packet traveling on msock, they are still processed in the order they have
1773  * been sent.
1774  *
1775  * Note: we don't care for Ack packets overtaking P_DATA packets.
1776  *
1777  * In case packet_seq is larger than mdev->peer_seq number, there are
1778  * outstanding packets on the msock. We wait for them to arrive.
1779  * In case we are the logically next packet, we update mdev->peer_seq
1780  * ourselves. Correctly handles 32bit wrap around.
1781  *
1782  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1783  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1784  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1785  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1786  *
1787  * returns 0 if we may process the packet,
1788  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1789 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1790 {
1791         DEFINE_WAIT(wait);
1792         long timeout;
1793         int ret;
1794
1795         if (!need_peer_seq(mdev))
1796                 return 0;
1797
1798         spin_lock(&mdev->peer_seq_lock);
1799         for (;;) {
1800                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1801                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1802                         ret = 0;
1803                         break;
1804                 }
1805                 if (signal_pending(current)) {
1806                         ret = -ERESTARTSYS;
1807                         break;
1808                 }
1809                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1810                 spin_unlock(&mdev->peer_seq_lock);
1811                 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1812                 timeout = schedule_timeout(timeout);
1813                 spin_lock(&mdev->peer_seq_lock);
1814                 if (!timeout) {
1815                         ret = -ETIMEDOUT;
1816                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1817                         break;
1818                 }
1819         }
1820         spin_unlock(&mdev->peer_seq_lock);
1821         finish_wait(&mdev->seq_wait, &wait);
1822         return ret;
1823 }
1824
1825 /* see also bio_flags_to_wire()
1826  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1827  * flags and back. We may replicate to other kernel versions. */
1828 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1829 {
1830         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1831                 (dpf & DP_FUA ? REQ_FUA : 0) |
1832                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1833                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1834 }
1835
1836 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1837                                     unsigned int size)
1838 {
1839         struct drbd_interval *i;
1840
1841     repeat:
1842         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1843                 struct drbd_request *req;
1844                 struct bio_and_error m;
1845
1846                 if (!i->local)
1847                         continue;
1848                 req = container_of(i, struct drbd_request, i);
1849                 if (!(req->rq_state & RQ_POSTPONED))
1850                         continue;
1851                 req->rq_state &= ~RQ_POSTPONED;
1852                 __req_mod(req, NEG_ACKED, &m);
1853                 spin_unlock_irq(&mdev->tconn->req_lock);
1854                 if (m.bio)
1855                         complete_master_bio(mdev, &m);
1856                 spin_lock_irq(&mdev->tconn->req_lock);
1857                 goto repeat;
1858         }
1859 }
1860
1861 static int handle_write_conflicts(struct drbd_conf *mdev,
1862                                   struct drbd_peer_request *peer_req)
1863 {
1864         struct drbd_tconn *tconn = mdev->tconn;
1865         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1866         sector_t sector = peer_req->i.sector;
1867         const unsigned int size = peer_req->i.size;
1868         struct drbd_interval *i;
1869         bool equal;
1870         int err;
1871
1872         /*
1873          * Inserting the peer request into the write_requests tree will prevent
1874          * new conflicting local requests from being added.
1875          */
1876         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1877
1878     repeat:
1879         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1880                 if (i == &peer_req->i)
1881                         continue;
1882
1883                 if (!i->local) {
1884                         /*
1885                          * Our peer has sent a conflicting remote request; this
1886                          * should not happen in a two-node setup.  Wait for the
1887                          * earlier peer request to complete.
1888                          */
1889                         err = drbd_wait_misc(mdev, i);
1890                         if (err)
1891                                 goto out;
1892                         goto repeat;
1893                 }
1894
1895                 equal = i->sector == sector && i->size == size;
1896                 if (resolve_conflicts) {
1897                         /*
1898                          * If the peer request is fully contained within the
1899                          * overlapping request, it can be discarded; otherwise,
1900                          * it will be retried once all overlapping requests
1901                          * have completed.
1902                          */
1903                         bool discard = i->sector <= sector && i->sector +
1904                                        (i->size >> 9) >= sector + (size >> 9);
1905
1906                         if (!equal)
1907                                 dev_alert(DEV, "Concurrent writes detected: "
1908                                                "local=%llus +%u, remote=%llus +%u, "
1909                                                "assuming %s came first\n",
1910                                           (unsigned long long)i->sector, i->size,
1911                                           (unsigned long long)sector, size,
1912                                           discard ? "local" : "remote");
1913
1914                         inc_unacked(mdev);
1915                         peer_req->w.cb = discard ? e_send_discard_write :
1916                                                    e_send_retry_write;
1917                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
1918                         wake_asender(mdev->tconn);
1919
1920                         err = -ENOENT;
1921                         goto out;
1922                 } else {
1923                         struct drbd_request *req =
1924                                 container_of(i, struct drbd_request, i);
1925
1926                         if (!equal)
1927                                 dev_alert(DEV, "Concurrent writes detected: "
1928                                                "local=%llus +%u, remote=%llus +%u\n",
1929                                           (unsigned long long)i->sector, i->size,
1930                                           (unsigned long long)sector, size);
1931
1932                         if (req->rq_state & RQ_LOCAL_PENDING ||
1933                             !(req->rq_state & RQ_POSTPONED)) {
1934                                 /*
1935                                  * Wait for the node with the discard flag to
1936                                  * decide if this request will be discarded or
1937                                  * retried.  Requests that are discarded will
1938                                  * disappear from the write_requests tree.
1939                                  *
1940                                  * In addition, wait for the conflicting
1941                                  * request to finish locally before submitting
1942                                  * the conflicting peer request.
1943                                  */
1944                                 err = drbd_wait_misc(mdev, &req->i);
1945                                 if (err) {
1946                                         _conn_request_state(mdev->tconn,
1947                                                             NS(conn, C_TIMEOUT),
1948                                                             CS_HARD);
1949                                         fail_postponed_requests(mdev, sector, size);
1950                                         goto out;
1951                                 }
1952                                 goto repeat;
1953                         }
1954                         /*
1955                          * Remember to restart the conflicting requests after
1956                          * the new peer request has completed.
1957                          */
1958                         peer_req->flags |= EE_RESTART_REQUESTS;
1959                 }
1960         }
1961         err = 0;
1962
1963     out:
1964         if (err)
1965                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1966         return err;
1967 }
1968
1969 /* mirrored write */
1970 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
1971 {
1972         struct drbd_conf *mdev;
1973         sector_t sector;
1974         struct drbd_peer_request *peer_req;
1975         struct p_data *p = tconn->data.rbuf;
1976         u32 peer_seq = be32_to_cpu(p->seq_num);
1977         int rw = WRITE;
1978         u32 dp_flags;
1979         int err;
1980
1981         mdev = vnr_to_mdev(tconn, pi->vnr);
1982         if (!mdev)
1983                 return -EIO;
1984
1985         if (!get_ldev(mdev)) {
1986                 int err2;
1987
1988                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
1989                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1990                 atomic_inc(&mdev->current_epoch->epoch_size);
1991                 err2 = drbd_drain_block(mdev, pi->size);
1992                 if (!err)
1993                         err = err2;
1994                 return err;
1995         }
1996
1997         /*
1998          * Corresponding put_ldev done either below (on various errors), or in
1999          * drbd_peer_request_endio, if we successfully submit the data at the
2000          * end of this function.
2001          */
2002
2003         sector = be64_to_cpu(p->sector);
2004         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2005         if (!peer_req) {
2006                 put_ldev(mdev);
2007                 return -EIO;
2008         }
2009
2010         peer_req->w.cb = e_end_block;
2011
2012         dp_flags = be32_to_cpu(p->dp_flags);
2013         rw |= wire_flags_to_bio(mdev, dp_flags);
2014
2015         if (dp_flags & DP_MAY_SET_IN_SYNC)
2016                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2017
2018         spin_lock(&mdev->epoch_lock);
2019         peer_req->epoch = mdev->current_epoch;
2020         atomic_inc(&peer_req->epoch->epoch_size);
2021         atomic_inc(&peer_req->epoch->active);
2022         spin_unlock(&mdev->epoch_lock);
2023
2024         if (mdev->tconn->net_conf->two_primaries) {
2025                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2026                 if (err)
2027                         goto out_interrupted;
2028                 spin_lock_irq(&mdev->tconn->req_lock);
2029                 err = handle_write_conflicts(mdev, peer_req);
2030                 if (err) {
2031                         spin_unlock_irq(&mdev->tconn->req_lock);
2032                         if (err == -ENOENT) {
2033                                 put_ldev(mdev);
2034                                 return 0;
2035                         }
2036                         goto out_interrupted;
2037                 }
2038         } else
2039                 spin_lock_irq(&mdev->tconn->req_lock);
2040         list_add(&peer_req->w.list, &mdev->active_ee);
2041         spin_unlock_irq(&mdev->tconn->req_lock);
2042
2043         switch (mdev->tconn->net_conf->wire_protocol) {
2044         case DRBD_PROT_C:
2045                 inc_unacked(mdev);
2046                 /* corresponding dec_unacked() in e_end_block()
2047                  * respective _drbd_clear_done_ee */
2048                 break;
2049         case DRBD_PROT_B:
2050                 /* I really don't like it that the receiver thread
2051                  * sends on the msock, but anyways */
2052                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2053                 break;
2054         case DRBD_PROT_A:
2055                 /* nothing to do */
2056                 break;
2057         }
2058
2059         if (mdev->state.pdsk < D_INCONSISTENT) {
2060                 /* In case we have the only disk of the cluster, */
2061                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2062                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2063                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2064                 drbd_al_begin_io(mdev, &peer_req->i);
2065         }
2066
2067         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2068         if (!err)
2069                 return 0;
2070
2071         /* don't care for the reason here */
2072         dev_err(DEV, "submit failed, triggering re-connect\n");
2073         spin_lock_irq(&mdev->tconn->req_lock);
2074         list_del(&peer_req->w.list);
2075         drbd_remove_epoch_entry_interval(mdev, peer_req);
2076         spin_unlock_irq(&mdev->tconn->req_lock);
2077         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2078                 drbd_al_complete_io(mdev, &peer_req->i);
2079
2080 out_interrupted:
2081         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2082         put_ldev(mdev);
2083         drbd_free_ee(mdev, peer_req);
2084         return err;
2085 }
2086
2087 /* We may throttle resync, if the lower device seems to be busy,
2088  * and current sync rate is above c_min_rate.
2089  *
2090  * To decide whether or not the lower device is busy, we use a scheme similar
2091  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2092  * (more than 64 sectors) of activity we cannot account for with our own resync
2093  * activity, it obviously is "busy".
2094  *
2095  * The current sync rate used here uses only the most recent two step marks,
2096  * to have a short time average so we can react faster.
2097  */
2098 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2099 {
2100         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2101         unsigned long db, dt, dbdt;
2102         struct lc_element *tmp;
2103         int curr_events;
2104         int throttle = 0;
2105
2106         /* feature disabled? */
2107         if (mdev->ldev->dc.c_min_rate == 0)
2108                 return 0;
2109
2110         spin_lock_irq(&mdev->al_lock);
2111         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2112         if (tmp) {
2113                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2114                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2115                         spin_unlock_irq(&mdev->al_lock);
2116                         return 0;
2117                 }
2118                 /* Do not slow down if app IO is already waiting for this extent */
2119         }
2120         spin_unlock_irq(&mdev->al_lock);
2121
2122         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2123                       (int)part_stat_read(&disk->part0, sectors[1]) -
2124                         atomic_read(&mdev->rs_sect_ev);
2125
2126         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2127                 unsigned long rs_left;
2128                 int i;
2129
2130                 mdev->rs_last_events = curr_events;
2131
2132                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2133                  * approx. */
2134                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2135
2136                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2137                         rs_left = mdev->ov_left;
2138                 else
2139                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2140
2141                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2142                 if (!dt)
2143                         dt++;
2144                 db = mdev->rs_mark_left[i] - rs_left;
2145                 dbdt = Bit2KB(db/dt);
2146
2147                 if (dbdt > mdev->ldev->dc.c_min_rate)
2148                         throttle = 1;
2149         }
2150         return throttle;
2151 }
2152
2153
2154 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2155 {
2156         struct drbd_conf *mdev;
2157         sector_t sector;
2158         sector_t capacity;
2159         struct drbd_peer_request *peer_req;
2160         struct digest_info *di = NULL;
2161         int size, verb;
2162         unsigned int fault_type;
2163         struct p_block_req *p = tconn->data.rbuf;
2164
2165         mdev = vnr_to_mdev(tconn, pi->vnr);
2166         if (!mdev)
2167                 return -EIO;
2168         capacity = drbd_get_capacity(mdev->this_bdev);
2169
2170         sector = be64_to_cpu(p->sector);
2171         size   = be32_to_cpu(p->blksize);
2172
2173         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2174                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2175                                 (unsigned long long)sector, size);
2176                 return -EINVAL;
2177         }
2178         if (sector + (size>>9) > capacity) {
2179                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2180                                 (unsigned long long)sector, size);
2181                 return -EINVAL;
2182         }
2183
2184         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2185                 verb = 1;
2186                 switch (pi->cmd) {
2187                 case P_DATA_REQUEST:
2188                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2189                         break;
2190                 case P_RS_DATA_REQUEST:
2191                 case P_CSUM_RS_REQUEST:
2192                 case P_OV_REQUEST:
2193                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2194                         break;
2195                 case P_OV_REPLY:
2196                         verb = 0;
2197                         dec_rs_pending(mdev);
2198                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2199                         break;
2200                 default:
2201                         BUG();
2202                 }
2203                 if (verb && __ratelimit(&drbd_ratelimit_state))
2204                         dev_err(DEV, "Can not satisfy peer's read request, "
2205                             "no local data.\n");
2206
2207                 /* drain possibly payload */
2208                 return drbd_drain_block(mdev, pi->size);
2209         }
2210
2211         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2212          * "criss-cross" setup, that might cause write-out on some other DRBD,
2213          * which in turn might block on the other node at this very place.  */
2214         peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2215         if (!peer_req) {
2216                 put_ldev(mdev);
2217                 return -ENOMEM;
2218         }
2219
2220         switch (pi->cmd) {
2221         case P_DATA_REQUEST:
2222                 peer_req->w.cb = w_e_end_data_req;
2223                 fault_type = DRBD_FAULT_DT_RD;
2224                 /* application IO, don't drbd_rs_begin_io */
2225                 goto submit;
2226
2227         case P_RS_DATA_REQUEST:
2228                 peer_req->w.cb = w_e_end_rsdata_req;
2229                 fault_type = DRBD_FAULT_RS_RD;
2230                 /* used in the sector offset progress display */
2231                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2232                 break;
2233
2234         case P_OV_REPLY:
2235         case P_CSUM_RS_REQUEST:
2236                 fault_type = DRBD_FAULT_RS_RD;
2237                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2238                 if (!di)
2239                         goto out_free_e;
2240
2241                 di->digest_size = pi->size;
2242                 di->digest = (((char *)di)+sizeof(struct digest_info));
2243
2244                 peer_req->digest = di;
2245                 peer_req->flags |= EE_HAS_DIGEST;
2246
2247                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2248                         goto out_free_e;
2249
2250                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2251                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2252                         peer_req->w.cb = w_e_end_csum_rs_req;
2253                         /* used in the sector offset progress display */
2254                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2255                 } else if (pi->cmd == P_OV_REPLY) {
2256                         /* track progress, we may need to throttle */
2257                         atomic_add(size >> 9, &mdev->rs_sect_in);
2258                         peer_req->w.cb = w_e_end_ov_reply;
2259                         dec_rs_pending(mdev);
2260                         /* drbd_rs_begin_io done when we sent this request,
2261                          * but accounting still needs to be done. */
2262                         goto submit_for_resync;
2263                 }
2264                 break;
2265
2266         case P_OV_REQUEST:
2267                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2268                     mdev->tconn->agreed_pro_version >= 90) {
2269                         unsigned long now = jiffies;
2270                         int i;
2271                         mdev->ov_start_sector = sector;
2272                         mdev->ov_position = sector;
2273                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2274                         mdev->rs_total = mdev->ov_left;
2275                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2276                                 mdev->rs_mark_left[i] = mdev->ov_left;
2277                                 mdev->rs_mark_time[i] = now;
2278                         }
2279                         dev_info(DEV, "Online Verify start sector: %llu\n",
2280                                         (unsigned long long)sector);
2281                 }
2282                 peer_req->w.cb = w_e_end_ov_req;
2283                 fault_type = DRBD_FAULT_RS_RD;
2284                 break;
2285
2286         default:
2287                 BUG();
2288         }
2289
2290         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2291          * wrt the receiver, but it is not as straightforward as it may seem.
2292          * Various places in the resync start and stop logic assume resync
2293          * requests are processed in order, requeuing this on the worker thread
2294          * introduces a bunch of new code for synchronization between threads.
2295          *
2296          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2297          * "forever", throttling after drbd_rs_begin_io will lock that extent
2298          * for application writes for the same time.  For now, just throttle
2299          * here, where the rest of the code expects the receiver to sleep for
2300          * a while, anyways.
2301          */
2302
2303         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2304          * this defers syncer requests for some time, before letting at least
2305          * on request through.  The resync controller on the receiving side
2306          * will adapt to the incoming rate accordingly.
2307          *
2308          * We cannot throttle here if remote is Primary/SyncTarget:
2309          * we would also throttle its application reads.
2310          * In that case, throttling is done on the SyncTarget only.
2311          */
2312         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2313                 schedule_timeout_uninterruptible(HZ/10);
2314         if (drbd_rs_begin_io(mdev, sector))
2315                 goto out_free_e;
2316
2317 submit_for_resync:
2318         atomic_add(size >> 9, &mdev->rs_sect_ev);
2319
2320 submit:
2321         inc_unacked(mdev);
2322         spin_lock_irq(&mdev->tconn->req_lock);
2323         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2324         spin_unlock_irq(&mdev->tconn->req_lock);
2325
2326         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2327                 return 0;
2328
2329         /* don't care for the reason here */
2330         dev_err(DEV, "submit failed, triggering re-connect\n");
2331         spin_lock_irq(&mdev->tconn->req_lock);
2332         list_del(&peer_req->w.list);
2333         spin_unlock_irq(&mdev->tconn->req_lock);
2334         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2335
2336 out_free_e:
2337         put_ldev(mdev);
2338         drbd_free_ee(mdev, peer_req);
2339         return -EIO;
2340 }
2341
2342 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2343 {
2344         int self, peer, rv = -100;
2345         unsigned long ch_self, ch_peer;
2346
2347         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2348         peer = mdev->p_uuid[UI_BITMAP] & 1;
2349
2350         ch_peer = mdev->p_uuid[UI_SIZE];
2351         ch_self = mdev->comm_bm_set;
2352
2353         switch (mdev->tconn->net_conf->after_sb_0p) {
2354         case ASB_CONSENSUS:
2355         case ASB_DISCARD_SECONDARY:
2356         case ASB_CALL_HELPER:
2357                 dev_err(DEV, "Configuration error.\n");
2358                 break;
2359         case ASB_DISCONNECT:
2360                 break;
2361         case ASB_DISCARD_YOUNGER_PRI:
2362                 if (self == 0 && peer == 1) {
2363                         rv = -1;
2364                         break;
2365                 }
2366                 if (self == 1 && peer == 0) {
2367                         rv =  1;
2368                         break;
2369                 }
2370                 /* Else fall through to one of the other strategies... */
2371         case ASB_DISCARD_OLDER_PRI:
2372                 if (self == 0 && peer == 1) {
2373                         rv = 1;
2374                         break;
2375                 }
2376                 if (self == 1 && peer == 0) {
2377                         rv = -1;
2378                         break;
2379                 }
2380                 /* Else fall through to one of the other strategies... */
2381                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2382                      "Using discard-least-changes instead\n");
2383         case ASB_DISCARD_ZERO_CHG:
2384                 if (ch_peer == 0 && ch_self == 0) {
2385                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2386                                 ? -1 : 1;
2387                         break;
2388                 } else {
2389                         if (ch_peer == 0) { rv =  1; break; }
2390                         if (ch_self == 0) { rv = -1; break; }
2391                 }
2392                 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2393                         break;
2394         case ASB_DISCARD_LEAST_CHG:
2395                 if      (ch_self < ch_peer)
2396                         rv = -1;
2397                 else if (ch_self > ch_peer)
2398                         rv =  1;
2399                 else /* ( ch_self == ch_peer ) */
2400                      /* Well, then use something else. */
2401                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2402                                 ? -1 : 1;
2403                 break;
2404         case ASB_DISCARD_LOCAL:
2405                 rv = -1;
2406                 break;
2407         case ASB_DISCARD_REMOTE:
2408                 rv =  1;
2409         }
2410
2411         return rv;
2412 }
2413
2414 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2415 {
2416         int hg, rv = -100;
2417
2418         switch (mdev->tconn->net_conf->after_sb_1p) {
2419         case ASB_DISCARD_YOUNGER_PRI:
2420         case ASB_DISCARD_OLDER_PRI:
2421         case ASB_DISCARD_LEAST_CHG:
2422         case ASB_DISCARD_LOCAL:
2423         case ASB_DISCARD_REMOTE:
2424                 dev_err(DEV, "Configuration error.\n");
2425                 break;
2426         case ASB_DISCONNECT:
2427                 break;
2428         case ASB_CONSENSUS:
2429                 hg = drbd_asb_recover_0p(mdev);
2430                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2431                         rv = hg;
2432                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2433                         rv = hg;
2434                 break;
2435         case ASB_VIOLENTLY:
2436                 rv = drbd_asb_recover_0p(mdev);
2437                 break;
2438         case ASB_DISCARD_SECONDARY:
2439                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2440         case ASB_CALL_HELPER:
2441                 hg = drbd_asb_recover_0p(mdev);
2442                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2443                         enum drbd_state_rv rv2;
2444
2445                         drbd_set_role(mdev, R_SECONDARY, 0);
2446                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2447                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2448                           * we do not need to wait for the after state change work either. */
2449                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2450                         if (rv2 != SS_SUCCESS) {
2451                                 drbd_khelper(mdev, "pri-lost-after-sb");
2452                         } else {
2453                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2454                                 rv = hg;
2455                         }
2456                 } else
2457                         rv = hg;
2458         }
2459
2460         return rv;
2461 }
2462
2463 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2464 {
2465         int hg, rv = -100;
2466
2467         switch (mdev->tconn->net_conf->after_sb_2p) {
2468         case ASB_DISCARD_YOUNGER_PRI:
2469         case ASB_DISCARD_OLDER_PRI:
2470         case ASB_DISCARD_LEAST_CHG:
2471         case ASB_DISCARD_LOCAL:
2472         case ASB_DISCARD_REMOTE:
2473         case ASB_CONSENSUS:
2474         case ASB_DISCARD_SECONDARY:
2475                 dev_err(DEV, "Configuration error.\n");
2476                 break;
2477         case ASB_VIOLENTLY:
2478                 rv = drbd_asb_recover_0p(mdev);
2479                 break;
2480         case ASB_DISCONNECT:
2481                 break;
2482         case ASB_CALL_HELPER:
2483                 hg = drbd_asb_recover_0p(mdev);
2484                 if (hg == -1) {
2485                         enum drbd_state_rv rv2;
2486
2487                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2488                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2489                           * we do not need to wait for the after state change work either. */
2490                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2491                         if (rv2 != SS_SUCCESS) {
2492                                 drbd_khelper(mdev, "pri-lost-after-sb");
2493                         } else {
2494                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2495                                 rv = hg;
2496                         }
2497                 } else
2498                         rv = hg;
2499         }
2500
2501         return rv;
2502 }
2503
2504 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2505                            u64 bits, u64 flags)
2506 {
2507         if (!uuid) {
2508                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2509                 return;
2510         }
2511         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2512              text,
2513              (unsigned long long)uuid[UI_CURRENT],
2514              (unsigned long long)uuid[UI_BITMAP],
2515              (unsigned long long)uuid[UI_HISTORY_START],
2516              (unsigned long long)uuid[UI_HISTORY_END],
2517              (unsigned long long)bits,
2518              (unsigned long long)flags);
2519 }
2520
2521 /*
2522   100   after split brain try auto recover
2523     2   C_SYNC_SOURCE set BitMap
2524     1   C_SYNC_SOURCE use BitMap
2525     0   no Sync
2526    -1   C_SYNC_TARGET use BitMap
2527    -2   C_SYNC_TARGET set BitMap
2528  -100   after split brain, disconnect
2529 -1000   unrelated data
2530 -1091   requires proto 91
2531 -1096   requires proto 96
2532  */
2533 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2534 {
2535         u64 self, peer;
2536         int i, j;
2537
2538         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2539         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2540
2541         *rule_nr = 10;
2542         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2543                 return 0;
2544
2545         *rule_nr = 20;
2546         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2547              peer != UUID_JUST_CREATED)
2548                 return -2;
2549
2550         *rule_nr = 30;
2551         if (self != UUID_JUST_CREATED &&
2552             (peer == UUID_JUST_CREATED || peer == (u64)0))
2553                 return 2;
2554
2555         if (self == peer) {
2556                 int rct, dc; /* roles at crash time */
2557
2558                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2559
2560                         if (mdev->tconn->agreed_pro_version < 91)
2561                                 return -1091;
2562
2563                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2564                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2565                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2566                                 drbd_uuid_set_bm(mdev, 0UL);
2567
2568                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2569                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2570                                 *rule_nr = 34;
2571                         } else {
2572                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2573                                 *rule_nr = 36;
2574                         }
2575
2576                         return 1;
2577                 }
2578
2579                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2580
2581                         if (mdev->tconn->agreed_pro_version < 91)
2582                                 return -1091;
2583
2584                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2585                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2586                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2587
2588                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2589                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2590                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2591
2592                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2593                                 *rule_nr = 35;
2594                         } else {
2595                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2596                                 *rule_nr = 37;
2597                         }
2598
2599                         return -1;
2600                 }
2601
2602                 /* Common power [off|failure] */
2603                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2604                         (mdev->p_uuid[UI_FLAGS] & 2);
2605                 /* lowest bit is set when we were primary,
2606                  * next bit (weight 2) is set when peer was primary */
2607                 *rule_nr = 40;
2608
2609                 switch (rct) {
2610                 case 0: /* !self_pri && !peer_pri */ return 0;
2611                 case 1: /*  self_pri && !peer_pri */ return 1;
2612                 case 2: /* !self_pri &&  peer_pri */ return -1;
2613                 case 3: /*  self_pri &&  peer_pri */
2614                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2615                         return dc ? -1 : 1;
2616                 }
2617         }
2618
2619         *rule_nr = 50;
2620         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2621         if (self == peer)
2622                 return -1;
2623
2624         *rule_nr = 51;
2625         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2626         if (self == peer) {
2627                 if (mdev->tconn->agreed_pro_version < 96 ?
2628                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2629                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2630                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2631                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2632                            resync as sync source modifications of the peer's UUIDs. */
2633
2634                         if (mdev->tconn->agreed_pro_version < 91)
2635                                 return -1091;
2636
2637                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2638                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2639
2640                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2641                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2642
2643                         return -1;
2644                 }
2645         }
2646
2647         *rule_nr = 60;
2648         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2649         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2650                 peer = mdev->p_uuid[i] & ~((u64)1);
2651                 if (self == peer)
2652                         return -2;
2653         }
2654
2655         *rule_nr = 70;
2656         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2657         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2658         if (self == peer)
2659                 return 1;
2660
2661         *rule_nr = 71;
2662         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2663         if (self == peer) {
2664                 if (mdev->tconn->agreed_pro_version < 96 ?
2665                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2666                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2667                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2668                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2669                            resync as sync source modifications of our UUIDs. */
2670
2671                         if (mdev->tconn->agreed_pro_version < 91)
2672                                 return -1091;
2673
2674                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2675                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2676
2677                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2678                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2679                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2680
2681                         return 1;
2682                 }
2683         }
2684
2685
2686         *rule_nr = 80;
2687         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2688         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2689                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2690                 if (self == peer)
2691                         return 2;
2692         }
2693
2694         *rule_nr = 90;
2695         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2696         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2697         if (self == peer && self != ((u64)0))
2698                 return 100;
2699
2700         *rule_nr = 100;
2701         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2702                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2703                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2704                         peer = mdev->p_uuid[j] & ~((u64)1);
2705                         if (self == peer)
2706                                 return -100;
2707                 }
2708         }
2709
2710         return -1000;
2711 }
2712
2713 /* drbd_sync_handshake() returns the new conn state on success, or
2714    CONN_MASK (-1) on failure.
2715  */
2716 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2717                                            enum drbd_disk_state peer_disk) __must_hold(local)
2718 {
2719         int hg, rule_nr;
2720         enum drbd_conns rv = C_MASK;
2721         enum drbd_disk_state mydisk;
2722
2723         mydisk = mdev->state.disk;
2724         if (mydisk == D_NEGOTIATING)
2725                 mydisk = mdev->new_state_tmp.disk;
2726
2727         dev_info(DEV, "drbd_sync_handshake:\n");
2728         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2729         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2730                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2731
2732         hg = drbd_uuid_compare(mdev, &rule_nr);
2733
2734         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2735
2736         if (hg == -1000) {
2737                 dev_alert(DEV, "Unrelated data, aborting!\n");
2738                 return C_MASK;
2739         }
2740         if (hg < -1000) {
2741                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2742                 return C_MASK;
2743         }
2744
2745         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2746             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2747                 int f = (hg == -100) || abs(hg) == 2;
2748                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2749                 if (f)
2750                         hg = hg*2;
2751                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2752                      hg > 0 ? "source" : "target");
2753         }
2754
2755         if (abs(hg) == 100)
2756                 drbd_khelper(mdev, "initial-split-brain");
2757
2758         if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
2759                 int pcount = (mdev->state.role == R_PRIMARY)
2760                            + (peer_role == R_PRIMARY);
2761                 int forced = (hg == -100);
2762
2763                 switch (pcount) {
2764                 case 0:
2765                         hg = drbd_asb_recover_0p(mdev);
2766                         break;
2767                 case 1:
2768                         hg = drbd_asb_recover_1p(mdev);
2769                         break;
2770                 case 2:
2771                         hg = drbd_asb_recover_2p(mdev);
2772                         break;
2773                 }
2774                 if (abs(hg) < 100) {
2775                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2776                              "automatically solved. Sync from %s node\n",
2777                              pcount, (hg < 0) ? "peer" : "this");
2778                         if (forced) {
2779                                 dev_warn(DEV, "Doing a full sync, since"
2780                                      " UUIDs where ambiguous.\n");
2781                                 hg = hg*2;
2782                         }
2783                 }
2784         }
2785
2786         if (hg == -100) {
2787                 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2788                         hg = -1;
2789                 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2790                         hg = 1;
2791
2792                 if (abs(hg) < 100)
2793                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2794                              "Sync from %s node\n",
2795                              (hg < 0) ? "peer" : "this");
2796         }
2797
2798         if (hg == -100) {
2799                 /* FIXME this log message is not correct if we end up here
2800                  * after an attempted attach on a diskless node.
2801                  * We just refuse to attach -- well, we drop the "connection"
2802                  * to that disk, in a way... */
2803                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2804                 drbd_khelper(mdev, "split-brain");
2805                 return C_MASK;
2806         }
2807
2808         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2809                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2810                 return C_MASK;
2811         }
2812
2813         if (hg < 0 && /* by intention we do not use mydisk here. */
2814             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2815                 switch (mdev->tconn->net_conf->rr_conflict) {
2816                 case ASB_CALL_HELPER:
2817                         drbd_khelper(mdev, "pri-lost");
2818                         /* fall through */
2819                 case ASB_DISCONNECT:
2820                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2821                         return C_MASK;
2822                 case ASB_VIOLENTLY:
2823                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2824                              "assumption\n");
2825                 }
2826         }
2827
2828         if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2829                 if (hg == 0)
2830                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2831                 else
2832                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2833                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2834                                  abs(hg) >= 2 ? "full" : "bit-map based");
2835                 return C_MASK;
2836         }
2837
2838         if (abs(hg) >= 2) {
2839                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2840                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2841                                         BM_LOCKED_SET_ALLOWED))
2842                         return C_MASK;
2843         }
2844
2845         if (hg > 0) { /* become sync source. */
2846                 rv = C_WF_BITMAP_S;
2847         } else if (hg < 0) { /* become sync target */
2848                 rv = C_WF_BITMAP_T;
2849         } else {
2850                 rv = C_CONNECTED;
2851                 if (drbd_bm_total_weight(mdev)) {
2852                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2853                              drbd_bm_total_weight(mdev));
2854                 }
2855         }
2856
2857         return rv;
2858 }
2859
2860 /* returns 1 if invalid */
2861 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2862 {
2863         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2864         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2865             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2866                 return 0;
2867
2868         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2869         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2870             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2871                 return 1;
2872
2873         /* everything else is valid if they are equal on both sides. */
2874         if (peer == self)
2875                 return 0;
2876
2877         /* everything es is invalid. */
2878         return 1;
2879 }
2880
2881 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2882 {
2883         struct p_protocol *p = tconn->data.rbuf;
2884         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2885         int p_want_lose, p_two_primaries, cf;
2886         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2887
2888         p_proto         = be32_to_cpu(p->protocol);
2889         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2890         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2891         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2892         p_two_primaries = be32_to_cpu(p->two_primaries);
2893         cf              = be32_to_cpu(p->conn_flags);
2894         p_want_lose = cf & CF_WANT_LOSE;
2895
2896         clear_bit(CONN_DRY_RUN, &tconn->flags);
2897
2898         if (cf & CF_DRY_RUN)
2899                 set_bit(CONN_DRY_RUN, &tconn->flags);
2900
2901         if (p_proto != tconn->net_conf->wire_protocol) {
2902                 conn_err(tconn, "incompatible communication protocols\n");
2903                 goto disconnect;
2904         }
2905
2906         if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2907                 conn_err(tconn, "incompatible after-sb-0pri settings\n");
2908                 goto disconnect;
2909         }
2910
2911         if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2912                 conn_err(tconn, "incompatible after-sb-1pri settings\n");
2913                 goto disconnect;
2914         }
2915
2916         if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2917                 conn_err(tconn, "incompatible after-sb-2pri settings\n");
2918                 goto disconnect;
2919         }
2920
2921         if (p_want_lose && tconn->net_conf->want_lose) {
2922                 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
2923                 goto disconnect;
2924         }
2925
2926         if (p_two_primaries != tconn->net_conf->two_primaries) {
2927                 conn_err(tconn, "incompatible setting of the two-primaries options\n");
2928                 goto disconnect;
2929         }
2930
2931         if (tconn->agreed_pro_version >= 87) {
2932                 unsigned char *my_alg = tconn->net_conf->integrity_alg;
2933                 int err;
2934
2935                 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
2936                 if (err)
2937                         return err;
2938
2939                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2940                 if (strcmp(p_integrity_alg, my_alg)) {
2941                         conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
2942                         goto disconnect;
2943                 }
2944                 conn_info(tconn, "data-integrity-alg: %s\n",
2945                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2946         }
2947
2948         return 0;
2949
2950 disconnect:
2951         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
2952         return -EIO;
2953 }
2954
2955 /* helper function
2956  * input: alg name, feature name
2957  * return: NULL (alg name was "")
2958  *         ERR_PTR(error) if something goes wrong
2959  *         or the crypto hash ptr, if it worked out ok. */
2960 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2961                 const char *alg, const char *name)
2962 {
2963         struct crypto_hash *tfm;
2964
2965         if (!alg[0])
2966                 return NULL;
2967
2968         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2969         if (IS_ERR(tfm)) {
2970                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2971                         alg, name, PTR_ERR(tfm));
2972                 return tfm;
2973         }
2974         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2975                 crypto_free_hash(tfm);
2976                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2977                 return ERR_PTR(-EINVAL);
2978         }
2979         return tfm;
2980 }
2981
2982 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
2983 {
2984         void *buffer = tconn->data.rbuf;
2985         int size = pi->size;
2986
2987         while (size) {
2988                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
2989                 s = drbd_recv(tconn, buffer, s);
2990                 if (s <= 0) {
2991                         if (s < 0)
2992                                 return s;
2993                         break;
2994                 }
2995                 size -= s;
2996         }
2997         if (size)
2998                 return -EIO;
2999         return 0;
3000 }
3001
3002 /*
3003  * config_unknown_volume  -  device configuration command for unknown volume
3004  *
3005  * When a device is added to an existing connection, the node on which the
3006  * device is added first will send configuration commands to its peer but the
3007  * peer will not know about the device yet.  It will warn and ignore these
3008  * commands.  Once the device is added on the second node, the second node will
3009  * send the same device configuration commands, but in the other direction.
3010  *
3011  * (We can also end up here if drbd is misconfigured.)
3012  */
3013 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3014 {
3015         conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3016                   pi->vnr, cmdname(pi->cmd));
3017         return ignore_remaining_packet(tconn, pi);
3018 }
3019
3020 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3021 {
3022         struct drbd_conf *mdev;
3023         struct p_rs_param_95 *p = tconn->data.rbuf;
3024         unsigned int header_size, data_size, exp_max_sz;
3025         struct crypto_hash *verify_tfm = NULL;
3026         struct crypto_hash *csums_tfm = NULL;
3027         const int apv = tconn->agreed_pro_version;
3028         int *rs_plan_s = NULL;
3029         int fifo_size = 0;
3030         int err;
3031
3032         mdev = vnr_to_mdev(tconn, pi->vnr);
3033         if (!mdev)
3034                 return config_unknown_volume(tconn, pi);
3035
3036         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3037                     : apv == 88 ? sizeof(struct p_rs_param)
3038                                         + SHARED_SECRET_MAX
3039                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3040                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3041
3042         if (pi->size > exp_max_sz) {
3043                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3044                     pi->size, exp_max_sz);
3045                 return -EIO;
3046         }
3047
3048         if (apv <= 88) {
3049                 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
3050                 data_size = pi->size - header_size;
3051         } else if (apv <= 94) {
3052                 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
3053                 data_size = pi->size - header_size;
3054                 D_ASSERT(data_size == 0);
3055         } else {
3056                 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
3057                 data_size = pi->size - header_size;
3058                 D_ASSERT(data_size == 0);
3059         }
3060
3061         /* initialize verify_alg and csums_alg */
3062         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3063
3064         err = drbd_recv_all(mdev->tconn, &p->head.payload, header_size);
3065         if (err)
3066                 return err;
3067
3068         if (get_ldev(mdev)) {
3069                 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3070                 put_ldev(mdev);
3071         }
3072
3073         if (apv >= 88) {
3074                 if (apv == 88) {
3075                         if (data_size > SHARED_SECRET_MAX) {
3076                                 dev_err(DEV, "verify-alg too long, "
3077                                     "peer wants %u, accepting only %u byte\n",
3078                                                 data_size, SHARED_SECRET_MAX);
3079                                 return -EIO;
3080                         }
3081
3082                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3083                         if (err)
3084                                 return err;
3085
3086                         /* we expect NUL terminated string */
3087                         /* but just in case someone tries to be evil */
3088                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3089                         p->verify_alg[data_size-1] = 0;
3090
3091                 } else /* apv >= 89 */ {
3092                         /* we still expect NUL terminated strings */
3093                         /* but just in case someone tries to be evil */
3094                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3095                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3096                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3097                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3098                 }
3099
3100                 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
3101                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3102                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3103                                     mdev->tconn->net_conf->verify_alg, p->verify_alg);
3104                                 goto disconnect;
3105                         }
3106                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3107                                         p->verify_alg, "verify-alg");
3108                         if (IS_ERR(verify_tfm)) {
3109                                 verify_tfm = NULL;
3110                                 goto disconnect;
3111                         }
3112                 }
3113
3114                 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
3115                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3116                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3117                                     mdev->tconn->net_conf->csums_alg, p->csums_alg);
3118                                 goto disconnect;
3119                         }
3120                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3121                                         p->csums_alg, "csums-alg");
3122                         if (IS_ERR(csums_tfm)) {
3123                                 csums_tfm = NULL;
3124                                 goto disconnect;
3125                         }
3126                 }
3127
3128                 if (apv > 94 && get_ldev(mdev)) {
3129                         mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3130                         mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3131                         mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3132                         mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3133                         mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3134
3135                         fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3136                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3137                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3138                                 if (!rs_plan_s) {
3139                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3140                                         put_ldev(mdev);
3141                                         goto disconnect;
3142                                 }
3143                         }
3144                         put_ldev(mdev);
3145                 }
3146
3147                 spin_lock(&mdev->peer_seq_lock);
3148                 /* lock against drbd_nl_syncer_conf() */
3149                 if (verify_tfm) {
3150                         strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3151                         mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3152                         crypto_free_hash(mdev->tconn->verify_tfm);
3153                         mdev->tconn->verify_tfm = verify_tfm;
3154                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3155                 }
3156                 if (csums_tfm) {
3157                         strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3158                         mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3159                         crypto_free_hash(mdev->tconn->csums_tfm);
3160                         mdev->tconn->csums_tfm = csums_tfm;
3161                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3162                 }
3163                 if (fifo_size != mdev->rs_plan_s.size) {
3164                         kfree(mdev->rs_plan_s.values);
3165                         mdev->rs_plan_s.values = rs_plan_s;
3166                         mdev->rs_plan_s.size   = fifo_size;
3167                         mdev->rs_planed = 0;
3168                 }
3169                 spin_unlock(&mdev->peer_seq_lock);
3170         }
3171         return 0;
3172
3173 disconnect:
3174         /* just for completeness: actually not needed,
3175          * as this is not reached if csums_tfm was ok. */
3176         crypto_free_hash(csums_tfm);
3177         /* but free the verify_tfm again, if csums_tfm did not work out */
3178         crypto_free_hash(verify_tfm);
3179         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3180         return -EIO;
3181 }
3182
3183 /* warn if the arguments differ by more than 12.5% */
3184 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3185         const char *s, sector_t a, sector_t b)
3186 {
3187         sector_t d;
3188         if (a == 0 || b == 0)
3189                 return;
3190         d = (a > b) ? (a - b) : (b - a);
3191         if (d > (a>>3) || d > (b>>3))
3192                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3193                      (unsigned long long)a, (unsigned long long)b);
3194 }
3195
3196 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3197 {
3198         struct drbd_conf *mdev;
3199         struct p_sizes *p = tconn->data.rbuf;
3200         enum determine_dev_size dd = unchanged;
3201         sector_t p_size, p_usize, my_usize;
3202         int ldsc = 0; /* local disk size changed */
3203         enum dds_flags ddsf;
3204
3205         mdev = vnr_to_mdev(tconn, pi->vnr);
3206         if (!mdev)
3207                 return config_unknown_volume(tconn, pi);
3208
3209         p_size = be64_to_cpu(p->d_size);
3210         p_usize = be64_to_cpu(p->u_size);
3211
3212         /* just store the peer's disk size for now.
3213          * we still need to figure out whether we accept that. */
3214         mdev->p_size = p_size;
3215
3216         if (get_ldev(mdev)) {
3217                 warn_if_differ_considerably(mdev, "lower level device sizes",
3218                            p_size, drbd_get_max_capacity(mdev->ldev));
3219                 warn_if_differ_considerably(mdev, "user requested size",
3220                                             p_usize, mdev->ldev->dc.disk_size);
3221
3222                 /* if this is the first connect, or an otherwise expected
3223                  * param exchange, choose the minimum */
3224                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3225                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3226                                              p_usize);
3227
3228                 my_usize = mdev->ldev->dc.disk_size;
3229
3230                 if (mdev->ldev->dc.disk_size != p_usize) {
3231                         mdev->ldev->dc.disk_size = p_usize;
3232                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3233                              (unsigned long)mdev->ldev->dc.disk_size);
3234                 }
3235
3236                 /* Never shrink a device with usable data during connect.
3237                    But allow online shrinking if we are connected. */
3238                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3239                    drbd_get_capacity(mdev->this_bdev) &&
3240                    mdev->state.disk >= D_OUTDATED &&
3241                    mdev->state.conn < C_CONNECTED) {
3242                         dev_err(DEV, "The peer's disk size is too small!\n");
3243                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3244                         mdev->ldev->dc.disk_size = my_usize;
3245                         put_ldev(mdev);
3246                         return -EIO;
3247                 }
3248                 put_ldev(mdev);
3249         }
3250
3251         ddsf = be16_to_cpu(p->dds_flags);
3252         if (get_ldev(mdev)) {
3253                 dd = drbd_determine_dev_size(mdev, ddsf);
3254                 put_ldev(mdev);
3255                 if (dd == dev_size_error)
3256                         return -EIO;
3257                 drbd_md_sync(mdev);
3258         } else {
3259                 /* I am diskless, need to accept the peer's size. */
3260                 drbd_set_my_capacity(mdev, p_size);
3261         }
3262
3263         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3264         drbd_reconsider_max_bio_size(mdev);
3265
3266         if (get_ldev(mdev)) {
3267                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3268                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3269                         ldsc = 1;
3270                 }
3271
3272                 put_ldev(mdev);
3273         }
3274
3275         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3276                 if (be64_to_cpu(p->c_size) !=
3277                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3278                         /* we have different sizes, probably peer
3279                          * needs to know my new size... */
3280                         drbd_send_sizes(mdev, 0, ddsf);
3281                 }
3282                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3283                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3284                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3285                             mdev->state.disk >= D_INCONSISTENT) {
3286                                 if (ddsf & DDSF_NO_RESYNC)
3287                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3288                                 else
3289                                         resync_after_online_grow(mdev);
3290                         } else
3291                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3292                 }
3293         }
3294
3295         return 0;
3296 }
3297
3298 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3299 {
3300         struct drbd_conf *mdev;
3301         struct p_uuids *p = tconn->data.rbuf;
3302         u64 *p_uuid;
3303         int i, updated_uuids = 0;
3304
3305         mdev = vnr_to_mdev(tconn, pi->vnr);
3306         if (!mdev)
3307                 return config_unknown_volume(tconn, pi);
3308
3309         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3310
3311         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3312                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3313
3314         kfree(mdev->p_uuid);
3315         mdev->p_uuid = p_uuid;
3316
3317         if (mdev->state.conn < C_CONNECTED &&
3318             mdev->state.disk < D_INCONSISTENT &&
3319             mdev->state.role == R_PRIMARY &&
3320             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3321                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3322                     (unsigned long long)mdev->ed_uuid);
3323                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3324                 return -EIO;
3325         }
3326
3327         if (get_ldev(mdev)) {
3328                 int skip_initial_sync =
3329                         mdev->state.conn == C_CONNECTED &&
3330                         mdev->tconn->agreed_pro_version >= 90 &&
3331                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3332                         (p_uuid[UI_FLAGS] & 8);
3333                 if (skip_initial_sync) {
3334                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3335                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3336                                         "clear_n_write from receive_uuids",
3337                                         BM_LOCKED_TEST_ALLOWED);
3338                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3339                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3340                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3341                                         CS_VERBOSE, NULL);
3342                         drbd_md_sync(mdev);
3343                         updated_uuids = 1;
3344                 }
3345                 put_ldev(mdev);
3346         } else if (mdev->state.disk < D_INCONSISTENT &&
3347                    mdev->state.role == R_PRIMARY) {
3348                 /* I am a diskless primary, the peer just created a new current UUID
3349                    for me. */
3350                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3351         }
3352
3353         /* Before we test for the disk state, we should wait until an eventually
3354            ongoing cluster wide state change is finished. That is important if
3355            we are primary and are detaching from our disk. We need to see the
3356            new disk state... */
3357         mutex_lock(mdev->state_mutex);
3358         mutex_unlock(mdev->state_mutex);
3359         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3360                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3361
3362         if (updated_uuids)
3363                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3364
3365         return 0;
3366 }
3367
3368 /**
3369  * convert_state() - Converts the peer's view of the cluster state to our point of view
3370  * @ps:         The state as seen by the peer.
3371  */
3372 static union drbd_state convert_state(union drbd_state ps)
3373 {
3374         union drbd_state ms;
3375
3376         static enum drbd_conns c_tab[] = {
3377                 [C_CONNECTED] = C_CONNECTED,
3378
3379                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3380                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3381                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3382                 [C_VERIFY_S]       = C_VERIFY_T,
3383                 [C_MASK]   = C_MASK,
3384         };
3385
3386         ms.i = ps.i;
3387
3388         ms.conn = c_tab[ps.conn];
3389         ms.peer = ps.role;
3390         ms.role = ps.peer;
3391         ms.pdsk = ps.disk;
3392         ms.disk = ps.pdsk;
3393         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3394
3395         return ms;
3396 }
3397
3398 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3399 {
3400         struct drbd_conf *mdev;
3401         struct p_req_state *p = tconn->data.rbuf;
3402         union drbd_state mask, val;
3403         enum drbd_state_rv rv;
3404
3405         mdev = vnr_to_mdev(tconn, pi->vnr);
3406         if (!mdev)
3407                 return -EIO;
3408
3409         mask.i = be32_to_cpu(p->mask);
3410         val.i = be32_to_cpu(p->val);
3411
3412         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3413             mutex_is_locked(mdev->state_mutex)) {
3414                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3415                 return 0;
3416         }
3417
3418         mask = convert_state(mask);
3419         val = convert_state(val);
3420
3421         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3422         drbd_send_sr_reply(mdev, rv);
3423
3424         drbd_md_sync(mdev);
3425
3426         return 0;
3427 }
3428
3429 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3430 {
3431         struct p_req_state *p = tconn->data.rbuf;
3432         union drbd_state mask, val;
3433         enum drbd_state_rv rv;
3434
3435         mask.i = be32_to_cpu(p->mask);
3436         val.i = be32_to_cpu(p->val);
3437
3438         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3439             mutex_is_locked(&tconn->cstate_mutex)) {
3440                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3441                 return 0;
3442         }
3443
3444         mask = convert_state(mask);
3445         val = convert_state(val);
3446
3447         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3448         conn_send_sr_reply(tconn, rv);
3449
3450         return 0;
3451 }
3452
3453 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3454 {
3455         struct drbd_conf *mdev;
3456         struct p_state *p = tconn->data.rbuf;
3457         union drbd_state os, ns, peer_state;
3458         enum drbd_disk_state real_peer_disk;
3459         enum chg_state_flags cs_flags;
3460         int rv;
3461
3462         mdev = vnr_to_mdev(tconn, pi->vnr);
3463         if (!mdev)
3464                 return config_unknown_volume(tconn, pi);
3465
3466         peer_state.i = be32_to_cpu(p->state);
3467
3468         real_peer_disk = peer_state.disk;
3469         if (peer_state.disk == D_NEGOTIATING) {
3470                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3471                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3472         }
3473
3474         spin_lock_irq(&mdev->tconn->req_lock);
3475  retry:
3476         os = ns = mdev->state;
3477         spin_unlock_irq(&mdev->tconn->req_lock);
3478
3479         /* peer says his disk is uptodate, while we think it is inconsistent,
3480          * and this happens while we think we have a sync going on. */
3481         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3482             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3483                 /* If we are (becoming) SyncSource, but peer is still in sync
3484                  * preparation, ignore its uptodate-ness to avoid flapping, it
3485                  * will change to inconsistent once the peer reaches active
3486                  * syncing states.
3487                  * It may have changed syncer-paused flags, however, so we
3488                  * cannot ignore this completely. */
3489                 if (peer_state.conn > C_CONNECTED &&
3490                     peer_state.conn < C_SYNC_SOURCE)
3491                         real_peer_disk = D_INCONSISTENT;
3492
3493                 /* if peer_state changes to connected at the same time,
3494                  * it explicitly notifies us that it finished resync.
3495                  * Maybe we should finish it up, too? */
3496                 else if (os.conn >= C_SYNC_SOURCE &&
3497                          peer_state.conn == C_CONNECTED) {
3498                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3499                                 drbd_resync_finished(mdev);
3500                         return 0;
3501                 }
3502         }
3503
3504         /* peer says his disk is inconsistent, while we think it is uptodate,
3505          * and this happens while the peer still thinks we have a sync going on,
3506          * but we think we are already done with the sync.
3507          * We ignore this to avoid flapping pdsk.
3508          * This should not happen, if the peer is a recent version of drbd. */
3509         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3510             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3511                 real_peer_disk = D_UP_TO_DATE;
3512
3513         if (ns.conn == C_WF_REPORT_PARAMS)
3514                 ns.conn = C_CONNECTED;
3515
3516         if (peer_state.conn == C_AHEAD)
3517                 ns.conn = C_BEHIND;
3518
3519         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3520             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3521                 int cr; /* consider resync */
3522
3523                 /* if we established a new connection */
3524                 cr  = (os.conn < C_CONNECTED);
3525                 /* if we had an established connection
3526                  * and one of the nodes newly attaches a disk */
3527                 cr |= (os.conn == C_CONNECTED &&
3528                        (peer_state.disk == D_NEGOTIATING ||
3529                         os.disk == D_NEGOTIATING));
3530                 /* if we have both been inconsistent, and the peer has been
3531                  * forced to be UpToDate with --overwrite-data */
3532                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3533                 /* if we had been plain connected, and the admin requested to
3534                  * start a sync by "invalidate" or "invalidate-remote" */
3535                 cr |= (os.conn == C_CONNECTED &&
3536                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3537                                  peer_state.conn <= C_WF_BITMAP_T));
3538
3539                 if (cr)
3540                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3541
3542                 put_ldev(mdev);
3543                 if (ns.conn == C_MASK) {
3544                         ns.conn = C_CONNECTED;
3545                         if (mdev->state.disk == D_NEGOTIATING) {
3546                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3547                         } else if (peer_state.disk == D_NEGOTIATING) {
3548                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3549                                 peer_state.disk = D_DISKLESS;
3550                                 real_peer_disk = D_DISKLESS;
3551                         } else {
3552                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3553                                         return -EIO;
3554                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3555                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3556                                 return -EIO;
3557                         }
3558                 }
3559         }
3560
3561         spin_lock_irq(&mdev->tconn->req_lock);
3562         if (mdev->state.i != os.i)
3563                 goto retry;
3564         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3565         ns.peer = peer_state.role;
3566         ns.pdsk = real_peer_disk;
3567         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3568         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3569                 ns.disk = mdev->new_state_tmp.disk;
3570         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3571         if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3572             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3573                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3574                    for temporal network outages! */
3575                 spin_unlock_irq(&mdev->tconn->req_lock);
3576                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3577                 tl_clear(mdev->tconn);
3578                 drbd_uuid_new_current(mdev);
3579                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3580                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3581                 return -EIO;
3582         }
3583         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3584         ns = mdev->state;
3585         spin_unlock_irq(&mdev->tconn->req_lock);
3586
3587         if (rv < SS_SUCCESS) {
3588                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3589                 return -EIO;
3590         }
3591
3592         if (os.conn > C_WF_REPORT_PARAMS) {
3593                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3594                     peer_state.disk != D_NEGOTIATING ) {
3595                         /* we want resync, peer has not yet decided to sync... */
3596                         /* Nowadays only used when forcing a node into primary role and
3597                            setting its disk to UpToDate with that */
3598                         drbd_send_uuids(mdev);
3599                         drbd_send_state(mdev);
3600                 }
3601         }
3602
3603         mdev->tconn->net_conf->want_lose = 0;
3604
3605         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3606
3607         return 0;
3608 }
3609
3610 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3611 {
3612         struct drbd_conf *mdev;
3613         struct p_rs_uuid *p = tconn->data.rbuf;
3614
3615         mdev = vnr_to_mdev(tconn, pi->vnr);
3616         if (!mdev)
3617                 return -EIO;
3618
3619         wait_event(mdev->misc_wait,
3620                    mdev->state.conn == C_WF_SYNC_UUID ||
3621                    mdev->state.conn == C_BEHIND ||
3622                    mdev->state.conn < C_CONNECTED ||
3623                    mdev->state.disk < D_NEGOTIATING);
3624
3625         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3626
3627         /* Here the _drbd_uuid_ functions are right, current should
3628            _not_ be rotated into the history */
3629         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3630                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3631                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3632
3633                 drbd_print_uuids(mdev, "updated sync uuid");
3634                 drbd_start_resync(mdev, C_SYNC_TARGET);
3635
3636                 put_ldev(mdev);
3637         } else
3638                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3639
3640         return 0;
3641 }
3642
3643 /**
3644  * receive_bitmap_plain
3645  *
3646  * Return 0 when done, 1 when another iteration is needed, and a negative error
3647  * code upon failure.
3648  */
3649 static int
3650 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3651                      struct p_header *h, struct bm_xfer_ctx *c)
3652 {
3653         unsigned long *buffer = (unsigned long *)h->payload;
3654         unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3655         unsigned want = num_words * sizeof(long);
3656         int err;
3657
3658         if (want != data_size) {
3659                 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3660                 return -EIO;
3661         }
3662         if (want == 0)
3663                 return 0;
3664         err = drbd_recv_all(mdev->tconn, buffer, want);
3665         if (err)
3666                 return err;
3667
3668         drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3669
3670         c->word_offset += num_words;
3671         c->bit_offset = c->word_offset * BITS_PER_LONG;
3672         if (c->bit_offset > c->bm_bits)
3673                 c->bit_offset = c->bm_bits;
3674
3675         return 1;
3676 }
3677
3678 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3679 {
3680         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3681 }
3682
3683 static int dcbp_get_start(struct p_compressed_bm *p)
3684 {
3685         return (p->encoding & 0x80) != 0;
3686 }
3687
3688 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3689 {
3690         return (p->encoding >> 4) & 0x7;
3691 }
3692
3693 /**
3694  * recv_bm_rle_bits
3695  *
3696  * Return 0 when done, 1 when another iteration is needed, and a negative error
3697  * code upon failure.
3698  */
3699 static int
3700 recv_bm_rle_bits(struct drbd_conf *mdev,
3701                 struct p_compressed_bm *p,
3702                  struct bm_xfer_ctx *c,
3703                  unsigned int len)
3704 {
3705         struct bitstream bs;
3706         u64 look_ahead;
3707         u64 rl;
3708         u64 tmp;
3709         unsigned long s = c->bit_offset;
3710         unsigned long e;
3711         int toggle = dcbp_get_start(p);
3712         int have;
3713         int bits;
3714
3715         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3716
3717         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3718         if (bits < 0)
3719                 return -EIO;
3720
3721         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3722                 bits = vli_decode_bits(&rl, look_ahead);
3723                 if (bits <= 0)
3724                         return -EIO;
3725
3726                 if (toggle) {
3727                         e = s + rl -1;
3728                         if (e >= c->bm_bits) {
3729                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3730                                 return -EIO;
3731                         }
3732                         _drbd_bm_set_bits(mdev, s, e);
3733                 }
3734
3735                 if (have < bits) {
3736                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3737                                 have, bits, look_ahead,
3738                                 (unsigned int)(bs.cur.b - p->code),
3739                                 (unsigned int)bs.buf_len);
3740                         return -EIO;
3741                 }
3742                 look_ahead >>= bits;
3743                 have -= bits;
3744
3745                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3746                 if (bits < 0)
3747                         return -EIO;
3748                 look_ahead |= tmp << have;
3749                 have += bits;
3750         }
3751
3752         c->bit_offset = s;
3753         bm_xfer_ctx_bit_to_word_offset(c);
3754
3755         return (s != c->bm_bits);
3756 }
3757
3758 /**
3759  * decode_bitmap_c
3760  *
3761  * Return 0 when done, 1 when another iteration is needed, and a negative error
3762  * code upon failure.
3763  */
3764 static int
3765 decode_bitmap_c(struct drbd_conf *mdev,
3766                 struct p_compressed_bm *p,
3767                 struct bm_xfer_ctx *c,
3768                 unsigned int len)
3769 {
3770         if (dcbp_get_code(p) == RLE_VLI_Bits)
3771                 return recv_bm_rle_bits(mdev, p, c, len);
3772
3773         /* other variants had been implemented for evaluation,
3774          * but have been dropped as this one turned out to be "best"
3775          * during all our tests. */
3776
3777         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3778         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3779         return -EIO;
3780 }
3781
3782 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3783                 const char *direction, struct bm_xfer_ctx *c)
3784 {
3785         /* what would it take to transfer it "plaintext" */
3786         unsigned plain = sizeof(struct p_header) *
3787                 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3788                 + c->bm_words * sizeof(long);
3789         unsigned total = c->bytes[0] + c->bytes[1];
3790         unsigned r;
3791
3792         /* total can not be zero. but just in case: */
3793         if (total == 0)
3794                 return;
3795
3796         /* don't report if not compressed */
3797         if (total >= plain)
3798                 return;
3799
3800         /* total < plain. check for overflow, still */
3801         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3802                                     : (1000 * total / plain);
3803
3804         if (r > 1000)
3805                 r = 1000;
3806
3807         r = 1000 - r;
3808         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3809              "total %u; compression: %u.%u%%\n",
3810                         direction,
3811                         c->bytes[1], c->packets[1],
3812                         c->bytes[0], c->packets[0],
3813                         total, r/10, r % 10);
3814 }
3815
3816 /* Since we are processing the bitfield from lower addresses to higher,
3817    it does not matter if the process it in 32 bit chunks or 64 bit
3818    chunks as long as it is little endian. (Understand it as byte stream,
3819    beginning with the lowest byte...) If we would use big endian
3820    we would need to process it from the highest address to the lowest,
3821    in order to be agnostic to the 32 vs 64 bits issue.
3822
3823    returns 0 on failure, 1 if we successfully received it. */
3824 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3825 {
3826         struct drbd_conf *mdev;
3827         struct bm_xfer_ctx c;
3828         int err;
3829         struct p_header *h = tconn->data.rbuf;
3830
3831         mdev = vnr_to_mdev(tconn, pi->vnr);
3832         if (!mdev)
3833                 return -EIO;
3834
3835         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3836         /* you are supposed to send additional out-of-sync information
3837          * if you actually set bits during this phase */
3838
3839         c = (struct bm_xfer_ctx) {
3840                 .bm_bits = drbd_bm_bits(mdev),
3841                 .bm_words = drbd_bm_words(mdev),
3842         };
3843
3844         for(;;) {
3845                 if (pi->cmd == P_BITMAP) {
3846                         err = receive_bitmap_plain(mdev, pi->size, h, &c);
3847                 } else if (pi->cmd == P_COMPRESSED_BITMAP) {
3848                         /* MAYBE: sanity check that we speak proto >= 90,
3849                          * and the feature is enabled! */
3850                         struct p_compressed_bm *p;
3851
3852                         if (pi->size > BM_PACKET_PAYLOAD_BYTES) {
3853                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3854                                 err = -EIO;
3855                                 goto out;
3856                         }
3857
3858                         p = mdev->tconn->data.rbuf;
3859                         err = drbd_recv_all(mdev->tconn, p->head.payload, pi->size);
3860                         if (err)
3861                                goto out;
3862                         if (pi->size <= (sizeof(*p) - sizeof(p->head))) {
3863                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
3864                                 err = -EIO;
3865                                 goto out;
3866                         }
3867                         err = decode_bitmap_c(mdev, p, &c, pi->size);
3868                 } else {
3869                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
3870                         err = -EIO;
3871                         goto out;
3872                 }
3873
3874                 c.packets[pi->cmd == P_BITMAP]++;
3875                 c.bytes[pi->cmd == P_BITMAP] += sizeof(struct p_header) + pi->size;
3876
3877                 if (err <= 0) {
3878                         if (err < 0)
3879                                 goto out;
3880                         break;
3881                 }
3882                 err = drbd_recv_header(mdev->tconn, pi);
3883                 if (err)
3884                         goto out;
3885         }
3886
3887         INFO_bm_xfer_stats(mdev, "receive", &c);
3888
3889         if (mdev->state.conn == C_WF_BITMAP_T) {
3890                 enum drbd_state_rv rv;
3891
3892                 err = drbd_send_bitmap(mdev);
3893                 if (err)
3894                         goto out;
3895                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3896                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3897                 D_ASSERT(rv == SS_SUCCESS);
3898         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3899                 /* admin may have requested C_DISCONNECTING,
3900                  * other threads may have noticed network errors */
3901                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3902                     drbd_conn_str(mdev->state.conn));
3903         }
3904         err = 0;
3905
3906  out:
3907         drbd_bm_unlock(mdev);
3908         if (!err && mdev->state.conn == C_WF_BITMAP_S)
3909                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3910         return err;
3911 }
3912
3913 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
3914 {
3915         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
3916                  pi->cmd, pi->size);
3917
3918         return ignore_remaining_packet(tconn, pi);
3919 }
3920
3921 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
3922 {
3923         /* Make sure we've acked all the TCP data associated
3924          * with the data requests being unplugged */
3925         drbd_tcp_quickack(tconn->data.socket);
3926
3927         return 0;
3928 }
3929
3930 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
3931 {
3932         struct drbd_conf *mdev;
3933         struct p_block_desc *p = tconn->data.rbuf;
3934
3935         mdev = vnr_to_mdev(tconn, pi->vnr);
3936         if (!mdev)
3937                 return -EIO;
3938
3939         switch (mdev->state.conn) {
3940         case C_WF_SYNC_UUID:
3941         case C_WF_BITMAP_T:
3942         case C_BEHIND:
3943                         break;
3944         default:
3945                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3946                                 drbd_conn_str(mdev->state.conn));
3947         }
3948
3949         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3950
3951         return 0;
3952 }
3953
3954 struct data_cmd {
3955         int expect_payload;
3956         size_t pkt_size;
3957         int (*fn)(struct drbd_tconn *, struct packet_info *);
3958 };
3959
3960 static struct data_cmd drbd_cmd_handler[] = {
3961         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
3962         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
3963         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3964         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3965         [P_BITMAP]          = { 1, sizeof(struct p_header), receive_bitmap } ,
3966         [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3967         [P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header), receive_UnplugRemote },
3968         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3969         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3970         [P_SYNC_PARAM]      = { 1, sizeof(struct p_header), receive_SyncParam },
3971         [P_SYNC_PARAM89]    = { 1, sizeof(struct p_header), receive_SyncParam },
3972         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3973         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
3974         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
3975         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
3976         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3977         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3978         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3979         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3980         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3981         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3982         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3983         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
3984 };
3985
3986 static void drbdd(struct drbd_tconn *tconn)
3987 {
3988         struct p_header *header = tconn->data.rbuf;
3989         struct packet_info pi;
3990         size_t shs; /* sub header size */
3991         int err;
3992
3993         while (get_t_state(&tconn->receiver) == RUNNING) {
3994                 struct data_cmd *cmd;
3995
3996                 drbd_thread_current_set_cpu(&tconn->receiver);
3997                 if (drbd_recv_header(tconn, &pi))
3998                         goto err_out;
3999
4000                 cmd = &drbd_cmd_handler[pi.cmd];
4001                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4002                         conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4003                         goto err_out;
4004                 }
4005
4006                 shs = cmd->pkt_size - sizeof(struct p_header);
4007                 if (pi.size - shs > 0 && !cmd->expect_payload) {
4008                         conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4009                         goto err_out;
4010                 }
4011
4012                 if (shs) {
4013                         err = drbd_recv_all_warn(tconn, &header->payload, shs);
4014                         if (err)
4015                                 goto err_out;
4016                         pi.size -= shs;
4017                 }
4018
4019                 err = cmd->fn(tconn, &pi);
4020                 if (err) {
4021                         conn_err(tconn, "error receiving %s, l: %d!\n",
4022                             cmdname(pi.cmd), pi.size);
4023                         goto err_out;
4024                 }
4025         }
4026         return;
4027
4028     err_out:
4029         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4030 }
4031
4032 void conn_flush_workqueue(struct drbd_tconn *tconn)
4033 {
4034         struct drbd_wq_barrier barr;
4035
4036         barr.w.cb = w_prev_work_done;
4037         barr.w.tconn = tconn;
4038         init_completion(&barr.done);
4039         drbd_queue_work(&tconn->data.work, &barr.w);
4040         wait_for_completion(&barr.done);
4041 }
4042
4043 static void drbd_disconnect(struct drbd_tconn *tconn)
4044 {
4045         enum drbd_conns oc;
4046         int rv = SS_UNKNOWN_ERROR;
4047
4048         if (tconn->cstate == C_STANDALONE)
4049                 return;
4050
4051         /* asender does not clean up anything. it must not interfere, either */
4052         drbd_thread_stop(&tconn->asender);
4053         drbd_free_sock(tconn);
4054
4055         idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4056         conn_info(tconn, "Connection closed\n");
4057
4058         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4059                 conn_try_outdate_peer_async(tconn);
4060
4061         spin_lock_irq(&tconn->req_lock);
4062         oc = tconn->cstate;
4063         if (oc >= C_UNCONNECTED)
4064                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4065
4066         spin_unlock_irq(&tconn->req_lock);
4067
4068         if (oc == C_DISCONNECTING) {
4069                 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4070
4071                 crypto_free_hash(tconn->cram_hmac_tfm);
4072                 tconn->cram_hmac_tfm = NULL;
4073
4074                 kfree(tconn->net_conf);
4075                 tconn->net_conf = NULL;
4076                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
4077         }
4078 }
4079
4080 static int drbd_disconnected(int vnr, void *p, void *data)
4081 {
4082         struct drbd_conf *mdev = (struct drbd_conf *)p;
4083         enum drbd_fencing_p fp;
4084         unsigned int i;
4085
4086         /* wait for current activity to cease. */
4087         spin_lock_irq(&mdev->tconn->req_lock);
4088         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4089         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4090         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4091         spin_unlock_irq(&mdev->tconn->req_lock);
4092
4093         /* We do not have data structures that would allow us to
4094          * get the rs_pending_cnt down to 0 again.
4095          *  * On C_SYNC_TARGET we do not have any data structures describing
4096          *    the pending RSDataRequest's we have sent.
4097          *  * On C_SYNC_SOURCE there is no data structure that tracks
4098          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4099          *  And no, it is not the sum of the reference counts in the
4100          *  resync_LRU. The resync_LRU tracks the whole operation including
4101          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4102          *  on the fly. */
4103         drbd_rs_cancel_all(mdev);
4104         mdev->rs_total = 0;
4105         mdev->rs_failed = 0;
4106         atomic_set(&mdev->rs_pending_cnt, 0);
4107         wake_up(&mdev->misc_wait);
4108
4109         del_timer(&mdev->request_timer);
4110
4111         del_timer_sync(&mdev->resync_timer);
4112         resync_timer_fn((unsigned long)mdev);
4113
4114         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4115          * w_make_resync_request etc. which may still be on the worker queue
4116          * to be "canceled" */
4117         drbd_flush_workqueue(mdev);
4118
4119         /* This also does reclaim_net_ee().  If we do this too early, we might
4120          * miss some resync ee and pages.*/
4121         drbd_process_done_ee(mdev);
4122
4123         kfree(mdev->p_uuid);
4124         mdev->p_uuid = NULL;
4125
4126         if (!is_susp(mdev->state))
4127                 tl_clear(mdev->tconn);
4128
4129         drbd_md_sync(mdev);
4130
4131         fp = FP_DONT_CARE;
4132         if (get_ldev(mdev)) {
4133                 fp = mdev->ldev->dc.fencing;
4134                 put_ldev(mdev);
4135         }
4136
4137         /* serialize with bitmap writeout triggered by the state change,
4138          * if any. */
4139         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4140
4141         /* tcp_close and release of sendpage pages can be deferred.  I don't
4142          * want to use SO_LINGER, because apparently it can be deferred for
4143          * more than 20 seconds (longest time I checked).
4144          *
4145          * Actually we don't care for exactly when the network stack does its
4146          * put_page(), but release our reference on these pages right here.
4147          */
4148         i = drbd_release_ee(mdev, &mdev->net_ee);
4149         if (i)
4150                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4151         i = atomic_read(&mdev->pp_in_use_by_net);
4152         if (i)
4153                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4154         i = atomic_read(&mdev->pp_in_use);
4155         if (i)
4156                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4157
4158         D_ASSERT(list_empty(&mdev->read_ee));
4159         D_ASSERT(list_empty(&mdev->active_ee));
4160         D_ASSERT(list_empty(&mdev->sync_ee));
4161         D_ASSERT(list_empty(&mdev->done_ee));
4162
4163         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4164         atomic_set(&mdev->current_epoch->epoch_size, 0);
4165         D_ASSERT(list_empty(&mdev->current_epoch->list));
4166
4167         return 0;
4168 }
4169
4170 /*
4171  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4172  * we can agree on is stored in agreed_pro_version.
4173  *
4174  * feature flags and the reserved array should be enough room for future
4175  * enhancements of the handshake protocol, and possible plugins...
4176  *
4177  * for now, they are expected to be zero, but ignored.
4178  */
4179 static int drbd_send_features(struct drbd_tconn *tconn)
4180 {
4181         /* ASSERT current == mdev->tconn->receiver ... */
4182         struct p_connection_features *p = tconn->data.sbuf;
4183         int err;
4184
4185         if (mutex_lock_interruptible(&tconn->data.mutex)) {
4186                 conn_err(tconn, "interrupted during initial handshake\n");
4187                 return -EINTR;
4188         }
4189
4190         if (tconn->data.socket == NULL) {
4191                 mutex_unlock(&tconn->data.mutex);
4192                 return -EIO;
4193         }
4194
4195         memset(p, 0, sizeof(*p));
4196         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4197         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4198         err = _conn_send_cmd(tconn, 0, &tconn->data, P_CONNECTION_FEATURES,
4199                              &p->head, sizeof(*p), 0);
4200         mutex_unlock(&tconn->data.mutex);
4201         return err;
4202 }
4203
4204 /*
4205  * return values:
4206  *   1 yes, we have a valid connection
4207  *   0 oops, did not work out, please try again
4208  *  -1 peer talks different language,
4209  *     no point in trying again, please go standalone.
4210  */
4211 static int drbd_do_features(struct drbd_tconn *tconn)
4212 {
4213         /* ASSERT current == tconn->receiver ... */
4214         struct p_connection_features *p = tconn->data.rbuf;
4215         const int expect = sizeof(struct p_connection_features) - sizeof(struct p_header80);
4216         struct packet_info pi;
4217         int err;
4218
4219         err = drbd_send_features(tconn);
4220         if (err)
4221                 return 0;
4222
4223         err = drbd_recv_header(tconn, &pi);
4224         if (err)
4225                 return 0;
4226
4227         if (pi.cmd != P_CONNECTION_FEATURES) {
4228                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4229                      cmdname(pi.cmd), pi.cmd);
4230                 return -1;
4231         }
4232
4233         if (pi.size != expect) {
4234                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4235                      expect, pi.size);
4236                 return -1;
4237         }
4238
4239         err = drbd_recv_all_warn(tconn, &p->head.payload, expect);
4240         if (err)
4241                 return 0;
4242
4243         p->protocol_min = be32_to_cpu(p->protocol_min);
4244         p->protocol_max = be32_to_cpu(p->protocol_max);
4245         if (p->protocol_max == 0)
4246                 p->protocol_max = p->protocol_min;
4247
4248         if (PRO_VERSION_MAX < p->protocol_min ||
4249             PRO_VERSION_MIN > p->protocol_max)
4250                 goto incompat;
4251
4252         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4253
4254         conn_info(tconn, "Handshake successful: "
4255              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4256
4257         return 1;
4258
4259  incompat:
4260         conn_err(tconn, "incompatible DRBD dialects: "
4261             "I support %d-%d, peer supports %d-%d\n",
4262             PRO_VERSION_MIN, PRO_VERSION_MAX,
4263             p->protocol_min, p->protocol_max);
4264         return -1;
4265 }
4266
4267 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4268 static int drbd_do_auth(struct drbd_tconn *tconn)
4269 {
4270         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4271         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4272         return -1;
4273 }
4274 #else
4275 #define CHALLENGE_LEN 64
4276
4277 /* Return value:
4278         1 - auth succeeded,
4279         0 - failed, try again (network error),
4280         -1 - auth failed, don't try again.
4281 */
4282
4283 static int drbd_do_auth(struct drbd_tconn *tconn)
4284 {
4285         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4286         struct scatterlist sg;
4287         char *response = NULL;
4288         char *right_response = NULL;
4289         char *peers_ch = NULL;
4290         unsigned int key_len = strlen(tconn->net_conf->shared_secret);
4291         unsigned int resp_size;
4292         struct hash_desc desc;
4293         struct packet_info pi;
4294         int err, rv;
4295
4296         desc.tfm = tconn->cram_hmac_tfm;
4297         desc.flags = 0;
4298
4299         rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4300                                 (u8 *)tconn->net_conf->shared_secret, key_len);
4301         if (rv) {
4302                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4303                 rv = -1;
4304                 goto fail;
4305         }
4306
4307         get_random_bytes(my_challenge, CHALLENGE_LEN);
4308
4309         rv = !conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4310         if (!rv)
4311                 goto fail;
4312
4313         err = drbd_recv_header(tconn, &pi);
4314         if (err) {
4315                 rv = 0;
4316                 goto fail;
4317         }
4318
4319         if (pi.cmd != P_AUTH_CHALLENGE) {
4320                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4321                     cmdname(pi.cmd), pi.cmd);
4322                 rv = 0;
4323                 goto fail;
4324         }
4325
4326         if (pi.size > CHALLENGE_LEN * 2) {
4327                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4328                 rv = -1;
4329                 goto fail;
4330         }
4331
4332         peers_ch = kmalloc(pi.size, GFP_NOIO);
4333         if (peers_ch == NULL) {
4334                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4335                 rv = -1;
4336                 goto fail;
4337         }
4338
4339         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4340         if (err) {
4341                 rv = 0;
4342                 goto fail;
4343         }
4344
4345         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4346         response = kmalloc(resp_size, GFP_NOIO);
4347         if (response == NULL) {
4348                 conn_err(tconn, "kmalloc of response failed\n");
4349                 rv = -1;
4350                 goto fail;
4351         }
4352
4353         sg_init_table(&sg, 1);
4354         sg_set_buf(&sg, peers_ch, pi.size);
4355
4356         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4357         if (rv) {
4358                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4359                 rv = -1;
4360                 goto fail;
4361         }
4362
4363         rv = !conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
4364         if (!rv)
4365                 goto fail;
4366
4367         err = drbd_recv_header(tconn, &pi);
4368         if (err) {
4369                 rv = 0;
4370                 goto fail;
4371         }
4372
4373         if (pi.cmd != P_AUTH_RESPONSE) {
4374                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4375                         cmdname(pi.cmd), pi.cmd);
4376                 rv = 0;
4377                 goto fail;
4378         }
4379
4380         if (pi.size != resp_size) {
4381                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4382                 rv = 0;
4383                 goto fail;
4384         }
4385
4386         err = drbd_recv_all_warn(tconn, response , resp_size);
4387         if (err) {
4388                 rv = 0;
4389                 goto fail;
4390         }
4391
4392         right_response = kmalloc(resp_size, GFP_NOIO);
4393         if (right_response == NULL) {
4394                 conn_err(tconn, "kmalloc of right_response failed\n");
4395                 rv = -1;
4396                 goto fail;
4397         }
4398
4399         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4400
4401         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4402         if (rv) {
4403                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4404                 rv = -1;
4405                 goto fail;
4406         }
4407
4408         rv = !memcmp(response, right_response, resp_size);
4409
4410         if (rv)
4411                 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4412                      resp_size, tconn->net_conf->cram_hmac_alg);
4413         else
4414                 rv = -1;
4415
4416  fail:
4417         kfree(peers_ch);
4418         kfree(response);
4419         kfree(right_response);
4420
4421         return rv;
4422 }
4423 #endif
4424
4425 int drbdd_init(struct drbd_thread *thi)
4426 {
4427         struct drbd_tconn *tconn = thi->tconn;
4428         int h;
4429
4430         conn_info(tconn, "receiver (re)started\n");
4431
4432         do {
4433                 h = drbd_connect(tconn);
4434                 if (h == 0) {
4435                         drbd_disconnect(tconn);
4436                         schedule_timeout_interruptible(HZ);
4437                 }
4438                 if (h == -1) {
4439                         conn_warn(tconn, "Discarding network configuration.\n");
4440                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4441                 }
4442         } while (h == 0);
4443
4444         if (h > 0) {
4445                 if (get_net_conf(tconn)) {
4446                         drbdd(tconn);
4447                         put_net_conf(tconn);
4448                 }
4449         }
4450
4451         drbd_disconnect(tconn);
4452
4453         conn_info(tconn, "receiver terminated\n");
4454         return 0;
4455 }
4456
4457 /* ********* acknowledge sender ******** */
4458
4459 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4460 {
4461         struct p_req_state_reply *p = tconn->meta.rbuf;
4462         int retcode = be32_to_cpu(p->retcode);
4463
4464         if (retcode >= SS_SUCCESS) {
4465                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4466         } else {
4467                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4468                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4469                          drbd_set_st_err_str(retcode), retcode);
4470         }
4471         wake_up(&tconn->ping_wait);
4472
4473         return true;
4474 }
4475
4476 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4477 {
4478         struct drbd_conf *mdev;
4479         struct p_req_state_reply *p = tconn->meta.rbuf;
4480         int retcode = be32_to_cpu(p->retcode);
4481
4482         mdev = vnr_to_mdev(tconn, pi->vnr);
4483         if (!mdev)
4484                 return false;
4485
4486         if (retcode >= SS_SUCCESS) {
4487                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4488         } else {
4489                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4490                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4491                         drbd_set_st_err_str(retcode), retcode);
4492         }
4493         wake_up(&mdev->state_wait);
4494
4495         return true;
4496 }
4497
4498 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4499 {
4500         return drbd_send_ping_ack(tconn);
4501
4502 }
4503
4504 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4505 {
4506         /* restore idle timeout */
4507         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4508         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4509                 wake_up(&tconn->ping_wait);
4510
4511         return true;
4512 }
4513
4514 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4515 {
4516         struct drbd_conf *mdev;
4517         struct p_block_ack *p = tconn->meta.rbuf;
4518         sector_t sector = be64_to_cpu(p->sector);
4519         int blksize = be32_to_cpu(p->blksize);
4520
4521         mdev = vnr_to_mdev(tconn, pi->vnr);
4522         if (!mdev)
4523                 return false;
4524
4525         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4526
4527         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4528
4529         if (get_ldev(mdev)) {
4530                 drbd_rs_complete_io(mdev, sector);
4531                 drbd_set_in_sync(mdev, sector, blksize);
4532                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4533                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4534                 put_ldev(mdev);
4535         }
4536         dec_rs_pending(mdev);
4537         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4538
4539         return true;
4540 }
4541
4542 static int
4543 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4544                               struct rb_root *root, const char *func,
4545                               enum drbd_req_event what, bool missing_ok)
4546 {
4547         struct drbd_request *req;
4548         struct bio_and_error m;
4549
4550         spin_lock_irq(&mdev->tconn->req_lock);
4551         req = find_request(mdev, root, id, sector, missing_ok, func);
4552         if (unlikely(!req)) {
4553                 spin_unlock_irq(&mdev->tconn->req_lock);
4554                 return false;
4555         }
4556         __req_mod(req, what, &m);
4557         spin_unlock_irq(&mdev->tconn->req_lock);
4558
4559         if (m.bio)
4560                 complete_master_bio(mdev, &m);
4561         return true;
4562 }
4563
4564 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4565 {
4566         struct drbd_conf *mdev;
4567         struct p_block_ack *p = tconn->meta.rbuf;
4568         sector_t sector = be64_to_cpu(p->sector);
4569         int blksize = be32_to_cpu(p->blksize);
4570         enum drbd_req_event what;
4571
4572         mdev = vnr_to_mdev(tconn, pi->vnr);
4573         if (!mdev)
4574                 return false;
4575
4576         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4577
4578         if (p->block_id == ID_SYNCER) {
4579                 drbd_set_in_sync(mdev, sector, blksize);
4580                 dec_rs_pending(mdev);
4581                 return true;
4582         }
4583         switch (pi->cmd) {
4584         case P_RS_WRITE_ACK:
4585                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4586                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4587                 break;
4588         case P_WRITE_ACK:
4589                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4590                 what = WRITE_ACKED_BY_PEER;
4591                 break;
4592         case P_RECV_ACK:
4593                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
4594                 what = RECV_ACKED_BY_PEER;
4595                 break;
4596         case P_DISCARD_WRITE:
4597                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4598                 what = DISCARD_WRITE;
4599                 break;
4600         case P_RETRY_WRITE:
4601                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4602                 what = POSTPONE_WRITE;
4603                 break;
4604         default:
4605                 D_ASSERT(0);
4606                 return false;
4607         }
4608
4609         return validate_req_change_req_state(mdev, p->block_id, sector,
4610                                              &mdev->write_requests, __func__,
4611                                              what, false);
4612 }
4613
4614 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4615 {
4616         struct drbd_conf *mdev;
4617         struct p_block_ack *p = tconn->meta.rbuf;
4618         sector_t sector = be64_to_cpu(p->sector);
4619         int size = be32_to_cpu(p->blksize);
4620         bool missing_ok = tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4621                           tconn->net_conf->wire_protocol == DRBD_PROT_B;
4622         bool found;
4623
4624         mdev = vnr_to_mdev(tconn, pi->vnr);
4625         if (!mdev)
4626                 return false;
4627
4628         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4629
4630         if (p->block_id == ID_SYNCER) {
4631                 dec_rs_pending(mdev);
4632                 drbd_rs_failed_io(mdev, sector, size);
4633                 return true;
4634         }
4635
4636         found = validate_req_change_req_state(mdev, p->block_id, sector,
4637                                               &mdev->write_requests, __func__,
4638                                               NEG_ACKED, missing_ok);
4639         if (!found) {
4640                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4641                    The master bio might already be completed, therefore the
4642                    request is no longer in the collision hash. */
4643                 /* In Protocol B we might already have got a P_RECV_ACK
4644                    but then get a P_NEG_ACK afterwards. */
4645                 if (!missing_ok)
4646                         return false;
4647                 drbd_set_out_of_sync(mdev, sector, size);
4648         }
4649         return true;
4650 }
4651
4652 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4653 {
4654         struct drbd_conf *mdev;
4655         struct p_block_ack *p = tconn->meta.rbuf;
4656         sector_t sector = be64_to_cpu(p->sector);
4657
4658         mdev = vnr_to_mdev(tconn, pi->vnr);
4659         if (!mdev)
4660                 return false;
4661
4662         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4663
4664         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4665             (unsigned long long)sector, be32_to_cpu(p->blksize));
4666
4667         return validate_req_change_req_state(mdev, p->block_id, sector,
4668                                              &mdev->read_requests, __func__,
4669                                              NEG_ACKED, false);
4670 }
4671
4672 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4673 {
4674         struct drbd_conf *mdev;
4675         sector_t sector;
4676         int size;
4677         struct p_block_ack *p = tconn->meta.rbuf;
4678
4679         mdev = vnr_to_mdev(tconn, pi->vnr);
4680         if (!mdev)
4681                 return false;
4682
4683         sector = be64_to_cpu(p->sector);
4684         size = be32_to_cpu(p->blksize);
4685
4686         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4687
4688         dec_rs_pending(mdev);
4689
4690         if (get_ldev_if_state(mdev, D_FAILED)) {
4691                 drbd_rs_complete_io(mdev, sector);
4692                 switch (pi->cmd) {
4693                 case P_NEG_RS_DREPLY:
4694                         drbd_rs_failed_io(mdev, sector, size);
4695                 case P_RS_CANCEL:
4696                         break;
4697                 default:
4698                         D_ASSERT(0);
4699                         put_ldev(mdev);
4700                         return false;
4701                 }
4702                 put_ldev(mdev);
4703         }
4704
4705         return true;
4706 }
4707
4708 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4709 {
4710         struct drbd_conf *mdev;
4711         struct p_barrier_ack *p = tconn->meta.rbuf;
4712
4713         mdev = vnr_to_mdev(tconn, pi->vnr);
4714         if (!mdev)
4715                 return false;
4716
4717         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4718
4719         if (mdev->state.conn == C_AHEAD &&
4720             atomic_read(&mdev->ap_in_flight) == 0 &&
4721             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4722                 mdev->start_resync_timer.expires = jiffies + HZ;
4723                 add_timer(&mdev->start_resync_timer);
4724         }
4725
4726         return true;
4727 }
4728
4729 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4730 {
4731         struct drbd_conf *mdev;
4732         struct p_block_ack *p = tconn->meta.rbuf;
4733         struct drbd_work *w;
4734         sector_t sector;
4735         int size;
4736
4737         mdev = vnr_to_mdev(tconn, pi->vnr);
4738         if (!mdev)
4739                 return false;
4740
4741         sector = be64_to_cpu(p->sector);
4742         size = be32_to_cpu(p->blksize);
4743
4744         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4745
4746         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4747                 drbd_ov_out_of_sync_found(mdev, sector, size);
4748         else
4749                 ov_out_of_sync_print(mdev);
4750
4751         if (!get_ldev(mdev))
4752                 return true;
4753
4754         drbd_rs_complete_io(mdev, sector);
4755         dec_rs_pending(mdev);
4756
4757         --mdev->ov_left;
4758
4759         /* let's advance progress step marks only for every other megabyte */
4760         if ((mdev->ov_left & 0x200) == 0x200)
4761                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4762
4763         if (mdev->ov_left == 0) {
4764                 w = kmalloc(sizeof(*w), GFP_NOIO);
4765                 if (w) {
4766                         w->cb = w_ov_finished;
4767                         w->mdev = mdev;
4768                         drbd_queue_work_front(&mdev->tconn->data.work, w);
4769                 } else {
4770                         dev_err(DEV, "kmalloc(w) failed.");
4771                         ov_out_of_sync_print(mdev);
4772                         drbd_resync_finished(mdev);
4773                 }
4774         }
4775         put_ldev(mdev);
4776         return true;
4777 }
4778
4779 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4780 {
4781         return true;
4782 }
4783
4784 static int tconn_process_done_ee(struct drbd_tconn *tconn)
4785 {
4786         struct drbd_conf *mdev;
4787         int i, not_empty = 0;
4788
4789         do {
4790                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4791                 flush_signals(current);
4792                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4793                         if (drbd_process_done_ee(mdev))
4794                                 return 1; /* error */
4795                 }
4796                 set_bit(SIGNAL_ASENDER, &tconn->flags);
4797
4798                 spin_lock_irq(&tconn->req_lock);
4799                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4800                         not_empty = !list_empty(&mdev->done_ee);
4801                         if (not_empty)
4802                                 break;
4803                 }
4804                 spin_unlock_irq(&tconn->req_lock);
4805         } while (not_empty);
4806
4807         return 0;
4808 }
4809
4810 struct asender_cmd {
4811         size_t pkt_size;
4812         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
4813 };
4814
4815 static struct asender_cmd asender_tbl[] = {
4816         [P_PING]            = { sizeof(struct p_header), got_Ping },
4817         [P_PING_ACK]        = { sizeof(struct p_header), got_PingAck },
4818         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4819         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4820         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4821         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
4822         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4823         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4824         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
4825         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4826         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4827         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4828         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4829         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4830         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
4831         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4832         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
4833 };
4834
4835 int drbd_asender(struct drbd_thread *thi)
4836 {
4837         struct drbd_tconn *tconn = thi->tconn;
4838         struct p_header *h = tconn->meta.rbuf;
4839         struct asender_cmd *cmd = NULL;
4840         struct packet_info pi;
4841         int rv;
4842         void *buf    = h;
4843         int received = 0;
4844         int expect   = sizeof(struct p_header);
4845         int ping_timeout_active = 0;
4846
4847         current->policy = SCHED_RR;  /* Make this a realtime task! */
4848         current->rt_priority = 2;    /* more important than all other tasks */
4849
4850         while (get_t_state(thi) == RUNNING) {
4851                 drbd_thread_current_set_cpu(thi);
4852                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
4853                         if (!drbd_send_ping(tconn)) {
4854                                 conn_err(tconn, "drbd_send_ping has failed\n");
4855                                 goto reconnect;
4856                         }
4857                         tconn->meta.socket->sk->sk_rcvtimeo =
4858                                 tconn->net_conf->ping_timeo*HZ/10;
4859                         ping_timeout_active = 1;
4860                 }
4861
4862                 /* TODO: conditionally cork; it may hurt latency if we cork without
4863                    much to send */
4864                 if (!tconn->net_conf->no_cork)
4865                         drbd_tcp_cork(tconn->meta.socket);
4866                 if (tconn_process_done_ee(tconn)) {
4867                         conn_err(tconn, "tconn_process_done_ee() failed\n");
4868                         goto reconnect;
4869                 }
4870                 /* but unconditionally uncork unless disabled */
4871                 if (!tconn->net_conf->no_cork)
4872                         drbd_tcp_uncork(tconn->meta.socket);
4873
4874                 /* short circuit, recv_msg would return EINTR anyways. */
4875                 if (signal_pending(current))
4876                         continue;
4877
4878                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4879                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4880
4881                 flush_signals(current);
4882
4883                 /* Note:
4884                  * -EINTR        (on meta) we got a signal
4885                  * -EAGAIN       (on meta) rcvtimeo expired
4886                  * -ECONNRESET   other side closed the connection
4887                  * -ERESTARTSYS  (on data) we got a signal
4888                  * rv <  0       other than above: unexpected error!
4889                  * rv == expected: full header or command
4890                  * rv <  expected: "woken" by signal during receive
4891                  * rv == 0       : "connection shut down by peer"
4892                  */
4893                 if (likely(rv > 0)) {
4894                         received += rv;
4895                         buf      += rv;
4896                 } else if (rv == 0) {
4897                         conn_err(tconn, "meta connection shut down by peer.\n");
4898                         goto reconnect;
4899                 } else if (rv == -EAGAIN) {
4900                         /* If the data socket received something meanwhile,
4901                          * that is good enough: peer is still alive. */
4902                         if (time_after(tconn->last_received,
4903                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
4904                                 continue;
4905                         if (ping_timeout_active) {
4906                                 conn_err(tconn, "PingAck did not arrive in time.\n");
4907                                 goto reconnect;
4908                         }
4909                         set_bit(SEND_PING, &tconn->flags);
4910                         continue;
4911                 } else if (rv == -EINTR) {
4912                         continue;
4913                 } else {
4914                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
4915                         goto reconnect;
4916                 }
4917
4918                 if (received == expect && cmd == NULL) {
4919                         if (decode_header(tconn, h, &pi))
4920                                 goto reconnect;
4921                         cmd = &asender_tbl[pi.cmd];
4922                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
4923                                 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
4924                                         pi.cmd, pi.size);
4925                                 goto disconnect;
4926                         }
4927                         expect = cmd->pkt_size;
4928                         if (pi.size != expect - sizeof(struct p_header)) {
4929                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
4930                                         pi.cmd, pi.size);
4931                                 goto reconnect;
4932                         }
4933                 }
4934                 if (received == expect) {
4935                         bool rv;
4936
4937                         rv = cmd->fn(tconn, &pi);
4938                         if (!rv) {
4939                                 conn_err(tconn, "%pf failed\n", cmd->fn);
4940                                 goto reconnect;
4941                         }
4942
4943                         tconn->last_received = jiffies;
4944
4945                         /* the idle_timeout (ping-int)
4946                          * has been restored in got_PingAck() */
4947                         if (cmd == &asender_tbl[P_PING_ACK])
4948                                 ping_timeout_active = 0;
4949
4950                         buf      = h;
4951                         received = 0;
4952                         expect   = sizeof(struct p_header);
4953                         cmd      = NULL;
4954                 }
4955         }
4956
4957         if (0) {
4958 reconnect:
4959                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4960         }
4961         if (0) {
4962 disconnect:
4963                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4964         }
4965         clear_bit(SIGNAL_ASENDER, &tconn->flags);
4966
4967         conn_info(tconn, "asender terminated\n");
4968
4969         return 0;
4970 }