]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
drbd: Rename drbd_{ ee -> peer_req }_has_active_page
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(int vnr, void *p, void *data);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
154 {
155         struct page *page = NULL;
156         struct page *tmp = NULL;
157         int i = 0;
158
159         /* Yes, testing drbd_pp_vacant outside the lock is racy.
160          * So what. It saves a spin_lock. */
161         if (drbd_pp_vacant >= number) {
162                 spin_lock(&drbd_pp_lock);
163                 page = page_chain_del(&drbd_pp_pool, number);
164                 if (page)
165                         drbd_pp_vacant -= number;
166                 spin_unlock(&drbd_pp_lock);
167                 if (page)
168                         return page;
169         }
170
171         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
172          * "criss-cross" setup, that might cause write-out on some other DRBD,
173          * which in turn might block on the other node at this very place.  */
174         for (i = 0; i < number; i++) {
175                 tmp = alloc_page(GFP_TRY);
176                 if (!tmp)
177                         break;
178                 set_page_private(tmp, (unsigned long)page);
179                 page = tmp;
180         }
181
182         if (i == number)
183                 return page;
184
185         /* Not enough pages immediately available this time.
186          * No need to jump around here, drbd_pp_alloc will retry this
187          * function "soon". */
188         if (page) {
189                 tmp = page_chain_tail(page, NULL);
190                 spin_lock(&drbd_pp_lock);
191                 page_chain_add(&drbd_pp_pool, page, tmp);
192                 drbd_pp_vacant += i;
193                 spin_unlock(&drbd_pp_lock);
194         }
195         return NULL;
196 }
197
198 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
199                                            struct list_head *to_be_freed)
200 {
201         struct drbd_peer_request *peer_req;
202         struct list_head *le, *tle;
203
204         /* The EEs are always appended to the end of the list. Since
205            they are sent in order over the wire, they have to finish
206            in order. As soon as we see the first not finished we can
207            stop to examine the list... */
208
209         list_for_each_safe(le, tle, &mdev->net_ee) {
210                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
211                 if (drbd_peer_req_has_active_page(peer_req))
212                         break;
213                 list_move(le, to_be_freed);
214         }
215 }
216
217 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
218 {
219         LIST_HEAD(reclaimed);
220         struct drbd_peer_request *peer_req, *t;
221
222         spin_lock_irq(&mdev->tconn->req_lock);
223         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
224         spin_unlock_irq(&mdev->tconn->req_lock);
225
226         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
227                 drbd_free_net_peer_req(mdev, peer_req);
228 }
229
230 /**
231  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
232  * @mdev:       DRBD device.
233  * @number:     number of pages requested
234  * @retry:      whether to retry, if not enough pages are available right now
235  *
236  * Tries to allocate number pages, first from our own page pool, then from
237  * the kernel, unless this allocation would exceed the max_buffers setting.
238  * Possibly retry until DRBD frees sufficient pages somewhere else.
239  *
240  * Returns a page chain linked via page->private.
241  */
242 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
243 {
244         struct page *page = NULL;
245         DEFINE_WAIT(wait);
246
247         /* Yes, we may run up to @number over max_buffers. If we
248          * follow it strictly, the admin will get it wrong anyways. */
249         if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
250                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
251
252         while (page == NULL) {
253                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
254
255                 drbd_kick_lo_and_reclaim_net(mdev);
256
257                 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
258                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
259                         if (page)
260                                 break;
261                 }
262
263                 if (!retry)
264                         break;
265
266                 if (signal_pending(current)) {
267                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
268                         break;
269                 }
270
271                 schedule();
272         }
273         finish_wait(&drbd_pp_wait, &wait);
274
275         if (page)
276                 atomic_add(number, &mdev->pp_in_use);
277         return page;
278 }
279
280 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
281  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
282  * Either links the page chain back to the global pool,
283  * or returns all pages to the system. */
284 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
285 {
286         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
287         int i;
288
289         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
290                 i = page_chain_free(page);
291         else {
292                 struct page *tmp;
293                 tmp = page_chain_tail(page, &i);
294                 spin_lock(&drbd_pp_lock);
295                 page_chain_add(&drbd_pp_pool, page, tmp);
296                 drbd_pp_vacant += i;
297                 spin_unlock(&drbd_pp_lock);
298         }
299         i = atomic_sub_return(i, a);
300         if (i < 0)
301                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
302                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
303         wake_up(&drbd_pp_wait);
304 }
305
306 /*
307 You need to hold the req_lock:
308  _drbd_wait_ee_list_empty()
309
310 You must not have the req_lock:
311  drbd_free_peer_req()
312  drbd_alloc_peer_req()
313  drbd_free_peer_reqs()
314  drbd_ee_fix_bhs()
315  drbd_finish_peer_reqs()
316  drbd_clear_done_ee()
317  drbd_wait_ee_list_empty()
318 */
319
320 struct drbd_peer_request *
321 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
322                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
323 {
324         struct drbd_peer_request *peer_req;
325         struct page *page;
326         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
327
328         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
329                 return NULL;
330
331         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
332         if (!peer_req) {
333                 if (!(gfp_mask & __GFP_NOWARN))
334                         dev_err(DEV, "%s: allocation failed\n", __func__);
335                 return NULL;
336         }
337
338         page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
339         if (!page)
340                 goto fail;
341
342         drbd_clear_interval(&peer_req->i);
343         peer_req->i.size = data_size;
344         peer_req->i.sector = sector;
345         peer_req->i.local = false;
346         peer_req->i.waiting = false;
347
348         peer_req->epoch = NULL;
349         peer_req->w.mdev = mdev;
350         peer_req->pages = page;
351         atomic_set(&peer_req->pending_bios, 0);
352         peer_req->flags = 0;
353         /*
354          * The block_id is opaque to the receiver.  It is not endianness
355          * converted, and sent back to the sender unchanged.
356          */
357         peer_req->block_id = id;
358
359         return peer_req;
360
361  fail:
362         mempool_free(peer_req, drbd_ee_mempool);
363         return NULL;
364 }
365
366 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
367                        int is_net)
368 {
369         if (peer_req->flags & EE_HAS_DIGEST)
370                 kfree(peer_req->digest);
371         drbd_pp_free(mdev, peer_req->pages, is_net);
372         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
373         D_ASSERT(drbd_interval_empty(&peer_req->i));
374         mempool_free(peer_req, drbd_ee_mempool);
375 }
376
377 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
378 {
379         LIST_HEAD(work_list);
380         struct drbd_peer_request *peer_req, *t;
381         int count = 0;
382         int is_net = list == &mdev->net_ee;
383
384         spin_lock_irq(&mdev->tconn->req_lock);
385         list_splice_init(list, &work_list);
386         spin_unlock_irq(&mdev->tconn->req_lock);
387
388         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
389                 __drbd_free_peer_req(mdev, peer_req, is_net);
390                 count++;
391         }
392         return count;
393 }
394
395 /*
396  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
397  */
398 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
399 {
400         LIST_HEAD(work_list);
401         LIST_HEAD(reclaimed);
402         struct drbd_peer_request *peer_req, *t;
403         int err = 0;
404
405         spin_lock_irq(&mdev->tconn->req_lock);
406         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
407         list_splice_init(&mdev->done_ee, &work_list);
408         spin_unlock_irq(&mdev->tconn->req_lock);
409
410         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
411                 drbd_free_net_peer_req(mdev, peer_req);
412
413         /* possible callbacks here:
414          * e_end_block, and e_end_resync_block, e_send_discard_write.
415          * all ignore the last argument.
416          */
417         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
418                 int err2;
419
420                 /* list_del not necessary, next/prev members not touched */
421                 err2 = peer_req->w.cb(&peer_req->w, !!err);
422                 if (!err)
423                         err = err2;
424                 drbd_free_peer_req(mdev, peer_req);
425         }
426         wake_up(&mdev->ee_wait);
427
428         return err;
429 }
430
431 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
432 {
433         DEFINE_WAIT(wait);
434
435         /* avoids spin_lock/unlock
436          * and calling prepare_to_wait in the fast path */
437         while (!list_empty(head)) {
438                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
439                 spin_unlock_irq(&mdev->tconn->req_lock);
440                 io_schedule();
441                 finish_wait(&mdev->ee_wait, &wait);
442                 spin_lock_irq(&mdev->tconn->req_lock);
443         }
444 }
445
446 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
447 {
448         spin_lock_irq(&mdev->tconn->req_lock);
449         _drbd_wait_ee_list_empty(mdev, head);
450         spin_unlock_irq(&mdev->tconn->req_lock);
451 }
452
453 /* see also kernel_accept; which is only present since 2.6.18.
454  * also we want to log which part of it failed, exactly */
455 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
456 {
457         struct sock *sk = sock->sk;
458         int err = 0;
459
460         *what = "listen";
461         err = sock->ops->listen(sock, 5);
462         if (err < 0)
463                 goto out;
464
465         *what = "sock_create_lite";
466         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
467                                newsock);
468         if (err < 0)
469                 goto out;
470
471         *what = "accept";
472         err = sock->ops->accept(sock, *newsock, 0);
473         if (err < 0) {
474                 sock_release(*newsock);
475                 *newsock = NULL;
476                 goto out;
477         }
478         (*newsock)->ops  = sock->ops;
479
480 out:
481         return err;
482 }
483
484 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
485 {
486         mm_segment_t oldfs;
487         struct kvec iov = {
488                 .iov_base = buf,
489                 .iov_len = size,
490         };
491         struct msghdr msg = {
492                 .msg_iovlen = 1,
493                 .msg_iov = (struct iovec *)&iov,
494                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
495         };
496         int rv;
497
498         oldfs = get_fs();
499         set_fs(KERNEL_DS);
500         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
501         set_fs(oldfs);
502
503         return rv;
504 }
505
506 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
507 {
508         mm_segment_t oldfs;
509         struct kvec iov = {
510                 .iov_base = buf,
511                 .iov_len = size,
512         };
513         struct msghdr msg = {
514                 .msg_iovlen = 1,
515                 .msg_iov = (struct iovec *)&iov,
516                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
517         };
518         int rv;
519
520         oldfs = get_fs();
521         set_fs(KERNEL_DS);
522
523         for (;;) {
524                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
525                 if (rv == size)
526                         break;
527
528                 /* Note:
529                  * ECONNRESET   other side closed the connection
530                  * ERESTARTSYS  (on  sock) we got a signal
531                  */
532
533                 if (rv < 0) {
534                         if (rv == -ECONNRESET)
535                                 conn_info(tconn, "sock was reset by peer\n");
536                         else if (rv != -ERESTARTSYS)
537                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
538                         break;
539                 } else if (rv == 0) {
540                         conn_info(tconn, "sock was shut down by peer\n");
541                         break;
542                 } else  {
543                         /* signal came in, or peer/link went down,
544                          * after we read a partial message
545                          */
546                         /* D_ASSERT(signal_pending(current)); */
547                         break;
548                 }
549         };
550
551         set_fs(oldfs);
552
553         if (rv != size)
554                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
555
556         return rv;
557 }
558
559 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
560 {
561         int err;
562
563         err = drbd_recv(tconn, buf, size);
564         if (err != size) {
565                 if (err >= 0)
566                         err = -EIO;
567         } else
568                 err = 0;
569         return err;
570 }
571
572 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
573 {
574         int err;
575
576         err = drbd_recv_all(tconn, buf, size);
577         if (err && !signal_pending(current))
578                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
579         return err;
580 }
581
582 /* quoting tcp(7):
583  *   On individual connections, the socket buffer size must be set prior to the
584  *   listen(2) or connect(2) calls in order to have it take effect.
585  * This is our wrapper to do so.
586  */
587 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
588                 unsigned int rcv)
589 {
590         /* open coded SO_SNDBUF, SO_RCVBUF */
591         if (snd) {
592                 sock->sk->sk_sndbuf = snd;
593                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
594         }
595         if (rcv) {
596                 sock->sk->sk_rcvbuf = rcv;
597                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
598         }
599 }
600
601 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
602 {
603         const char *what;
604         struct socket *sock;
605         struct sockaddr_in6 src_in6;
606         int err;
607         int disconnect_on_error = 1;
608
609         if (!get_net_conf(tconn))
610                 return NULL;
611
612         what = "sock_create_kern";
613         err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
614                 SOCK_STREAM, IPPROTO_TCP, &sock);
615         if (err < 0) {
616                 sock = NULL;
617                 goto out;
618         }
619
620         sock->sk->sk_rcvtimeo =
621         sock->sk->sk_sndtimeo =  tconn->net_conf->try_connect_int*HZ;
622         drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
623                         tconn->net_conf->rcvbuf_size);
624
625        /* explicitly bind to the configured IP as source IP
626         *  for the outgoing connections.
627         *  This is needed for multihomed hosts and to be
628         *  able to use lo: interfaces for drbd.
629         * Make sure to use 0 as port number, so linux selects
630         *  a free one dynamically.
631         */
632         memcpy(&src_in6, tconn->net_conf->my_addr,
633                min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
634         if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
635                 src_in6.sin6_port = 0;
636         else
637                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
638
639         what = "bind before connect";
640         err = sock->ops->bind(sock,
641                               (struct sockaddr *) &src_in6,
642                               tconn->net_conf->my_addr_len);
643         if (err < 0)
644                 goto out;
645
646         /* connect may fail, peer not yet available.
647          * stay C_WF_CONNECTION, don't go Disconnecting! */
648         disconnect_on_error = 0;
649         what = "connect";
650         err = sock->ops->connect(sock,
651                                  (struct sockaddr *)tconn->net_conf->peer_addr,
652                                  tconn->net_conf->peer_addr_len, 0);
653
654 out:
655         if (err < 0) {
656                 if (sock) {
657                         sock_release(sock);
658                         sock = NULL;
659                 }
660                 switch (-err) {
661                         /* timeout, busy, signal pending */
662                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
663                 case EINTR: case ERESTARTSYS:
664                         /* peer not (yet) available, network problem */
665                 case ECONNREFUSED: case ENETUNREACH:
666                 case EHOSTDOWN:    case EHOSTUNREACH:
667                         disconnect_on_error = 0;
668                         break;
669                 default:
670                         conn_err(tconn, "%s failed, err = %d\n", what, err);
671                 }
672                 if (disconnect_on_error)
673                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
674         }
675         put_net_conf(tconn);
676         return sock;
677 }
678
679 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
680 {
681         int timeo, err;
682         struct socket *s_estab = NULL, *s_listen;
683         const char *what;
684
685         if (!get_net_conf(tconn))
686                 return NULL;
687
688         what = "sock_create_kern";
689         err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
690                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
691         if (err) {
692                 s_listen = NULL;
693                 goto out;
694         }
695
696         timeo = tconn->net_conf->try_connect_int * HZ;
697         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
698
699         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
700         s_listen->sk->sk_rcvtimeo = timeo;
701         s_listen->sk->sk_sndtimeo = timeo;
702         drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
703                         tconn->net_conf->rcvbuf_size);
704
705         what = "bind before listen";
706         err = s_listen->ops->bind(s_listen,
707                               (struct sockaddr *) tconn->net_conf->my_addr,
708                               tconn->net_conf->my_addr_len);
709         if (err < 0)
710                 goto out;
711
712         err = drbd_accept(&what, s_listen, &s_estab);
713
714 out:
715         if (s_listen)
716                 sock_release(s_listen);
717         if (err < 0) {
718                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
719                         conn_err(tconn, "%s failed, err = %d\n", what, err);
720                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
721                 }
722         }
723         put_net_conf(tconn);
724
725         return s_estab;
726 }
727
728 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
729
730 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
731                              enum drbd_packet cmd)
732 {
733         if (!conn_prepare_command(tconn, sock))
734                 return -EIO;
735         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
736 }
737
738 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
739 {
740         unsigned int header_size = drbd_header_size(tconn);
741         struct packet_info pi;
742         int err;
743
744         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
745         if (err != header_size) {
746                 if (err >= 0)
747                         err = -EIO;
748                 return err;
749         }
750         err = decode_header(tconn, tconn->data.rbuf, &pi);
751         if (err)
752                 return err;
753         return pi.cmd;
754 }
755
756 /**
757  * drbd_socket_okay() - Free the socket if its connection is not okay
758  * @sock:       pointer to the pointer to the socket.
759  */
760 static int drbd_socket_okay(struct socket **sock)
761 {
762         int rr;
763         char tb[4];
764
765         if (!*sock)
766                 return false;
767
768         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
769
770         if (rr > 0 || rr == -EAGAIN) {
771                 return true;
772         } else {
773                 sock_release(*sock);
774                 *sock = NULL;
775                 return false;
776         }
777 }
778 /* Gets called if a connection is established, or if a new minor gets created
779    in a connection */
780 int drbd_connected(int vnr, void *p, void *data)
781 {
782         struct drbd_conf *mdev = (struct drbd_conf *)p;
783         int err;
784
785         atomic_set(&mdev->packet_seq, 0);
786         mdev->peer_seq = 0;
787
788         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
789                 &mdev->tconn->cstate_mutex :
790                 &mdev->own_state_mutex;
791
792         err = drbd_send_sync_param(mdev);
793         if (!err)
794                 err = drbd_send_sizes(mdev, 0, 0);
795         if (!err)
796                 err = drbd_send_uuids(mdev);
797         if (!err)
798                 err = drbd_send_state(mdev);
799         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
800         clear_bit(RESIZE_PENDING, &mdev->flags);
801         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
802         return err;
803 }
804
805 /*
806  * return values:
807  *   1 yes, we have a valid connection
808  *   0 oops, did not work out, please try again
809  *  -1 peer talks different language,
810  *     no point in trying again, please go standalone.
811  *  -2 We do not have a network config...
812  */
813 static int drbd_connect(struct drbd_tconn *tconn)
814 {
815         struct socket *sock, *msock;
816         int try, h, ok;
817
818         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
819                 return -2;
820
821         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
822
823         /* Assume that the peer only understands protocol 80 until we know better.  */
824         tconn->agreed_pro_version = 80;
825
826         do {
827                 struct socket *s;
828
829                 for (try = 0;;) {
830                         /* 3 tries, this should take less than a second! */
831                         s = drbd_try_connect(tconn);
832                         if (s || ++try >= 3)
833                                 break;
834                         /* give the other side time to call bind() & listen() */
835                         schedule_timeout_interruptible(HZ / 10);
836                 }
837
838                 if (s) {
839                         if (!tconn->data.socket) {
840                                 tconn->data.socket = s;
841                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
842                         } else if (!tconn->meta.socket) {
843                                 tconn->meta.socket = s;
844                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
845                         } else {
846                                 conn_err(tconn, "Logic error in drbd_connect()\n");
847                                 goto out_release_sockets;
848                         }
849                 }
850
851                 if (tconn->data.socket && tconn->meta.socket) {
852                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
853                         ok = drbd_socket_okay(&tconn->data.socket);
854                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
855                         if (ok)
856                                 break;
857                 }
858
859 retry:
860                 s = drbd_wait_for_connect(tconn);
861                 if (s) {
862                         try = receive_first_packet(tconn, s);
863                         drbd_socket_okay(&tconn->data.socket);
864                         drbd_socket_okay(&tconn->meta.socket);
865                         switch (try) {
866                         case P_INITIAL_DATA:
867                                 if (tconn->data.socket) {
868                                         conn_warn(tconn, "initial packet S crossed\n");
869                                         sock_release(tconn->data.socket);
870                                 }
871                                 tconn->data.socket = s;
872                                 break;
873                         case P_INITIAL_META:
874                                 if (tconn->meta.socket) {
875                                         conn_warn(tconn, "initial packet M crossed\n");
876                                         sock_release(tconn->meta.socket);
877                                 }
878                                 tconn->meta.socket = s;
879                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
880                                 break;
881                         default:
882                                 conn_warn(tconn, "Error receiving initial packet\n");
883                                 sock_release(s);
884                                 if (random32() & 1)
885                                         goto retry;
886                         }
887                 }
888
889                 if (tconn->cstate <= C_DISCONNECTING)
890                         goto out_release_sockets;
891                 if (signal_pending(current)) {
892                         flush_signals(current);
893                         smp_rmb();
894                         if (get_t_state(&tconn->receiver) == EXITING)
895                                 goto out_release_sockets;
896                 }
897
898                 if (tconn->data.socket && &tconn->meta.socket) {
899                         ok = drbd_socket_okay(&tconn->data.socket);
900                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
901                         if (ok)
902                                 break;
903                 }
904         } while (1);
905
906         sock  = tconn->data.socket;
907         msock = tconn->meta.socket;
908
909         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
910         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
911
912         sock->sk->sk_allocation = GFP_NOIO;
913         msock->sk->sk_allocation = GFP_NOIO;
914
915         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
916         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
917
918         /* NOT YET ...
919          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
920          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
921          * first set it to the P_CONNECTION_FEATURES timeout,
922          * which we set to 4x the configured ping_timeout. */
923         sock->sk->sk_sndtimeo =
924         sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
925
926         msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
927         msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
928
929         /* we don't want delays.
930          * we use TCP_CORK where appropriate, though */
931         drbd_tcp_nodelay(sock);
932         drbd_tcp_nodelay(msock);
933
934         tconn->last_received = jiffies;
935
936         h = drbd_do_features(tconn);
937         if (h <= 0)
938                 return h;
939
940         if (tconn->cram_hmac_tfm) {
941                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
942                 switch (drbd_do_auth(tconn)) {
943                 case -1:
944                         conn_err(tconn, "Authentication of peer failed\n");
945                         return -1;
946                 case 0:
947                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
948                         return 0;
949                 }
950         }
951
952         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
953                 return 0;
954
955         sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
956         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
957
958         drbd_thread_start(&tconn->asender);
959
960         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
961                 return -1;
962
963         return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
964
965 out_release_sockets:
966         if (tconn->data.socket) {
967                 sock_release(tconn->data.socket);
968                 tconn->data.socket = NULL;
969         }
970         if (tconn->meta.socket) {
971                 sock_release(tconn->meta.socket);
972                 tconn->meta.socket = NULL;
973         }
974         return -1;
975 }
976
977 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
978 {
979         unsigned int header_size = drbd_header_size(tconn);
980
981         if (header_size == sizeof(struct p_header100) &&
982             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
983                 struct p_header100 *h = header;
984                 if (h->pad != 0) {
985                         conn_err(tconn, "Header padding is not zero\n");
986                         return -EINVAL;
987                 }
988                 pi->vnr = be16_to_cpu(h->volume);
989                 pi->cmd = be16_to_cpu(h->command);
990                 pi->size = be32_to_cpu(h->length);
991         } else if (header_size == sizeof(struct p_header95) &&
992                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
993                 struct p_header95 *h = header;
994                 pi->cmd = be16_to_cpu(h->command);
995                 pi->size = be32_to_cpu(h->length);
996                 pi->vnr = 0;
997         } else if (header_size == sizeof(struct p_header80) &&
998                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
999                 struct p_header80 *h = header;
1000                 pi->cmd = be16_to_cpu(h->command);
1001                 pi->size = be16_to_cpu(h->length);
1002                 pi->vnr = 0;
1003         } else {
1004                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1005                          be32_to_cpu(*(__be32 *)header),
1006                          tconn->agreed_pro_version);
1007                 return -EINVAL;
1008         }
1009         pi->data = header + header_size;
1010         return 0;
1011 }
1012
1013 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1014 {
1015         void *buffer = tconn->data.rbuf;
1016         int err;
1017
1018         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1019         if (err)
1020                 return err;
1021
1022         err = decode_header(tconn, buffer, pi);
1023         tconn->last_received = jiffies;
1024
1025         return err;
1026 }
1027
1028 static void drbd_flush(struct drbd_conf *mdev)
1029 {
1030         int rv;
1031
1032         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1033                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1034                                         NULL);
1035                 if (rv) {
1036                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
1037                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1038                          * don't try again for ANY return value != 0
1039                          * if (rv == -EOPNOTSUPP) */
1040                         drbd_bump_write_ordering(mdev, WO_drain_io);
1041                 }
1042                 put_ldev(mdev);
1043         }
1044 }
1045
1046 /**
1047  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1048  * @mdev:       DRBD device.
1049  * @epoch:      Epoch object.
1050  * @ev:         Epoch event.
1051  */
1052 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1053                                                struct drbd_epoch *epoch,
1054                                                enum epoch_event ev)
1055 {
1056         int epoch_size;
1057         struct drbd_epoch *next_epoch;
1058         enum finish_epoch rv = FE_STILL_LIVE;
1059
1060         spin_lock(&mdev->epoch_lock);
1061         do {
1062                 next_epoch = NULL;
1063
1064                 epoch_size = atomic_read(&epoch->epoch_size);
1065
1066                 switch (ev & ~EV_CLEANUP) {
1067                 case EV_PUT:
1068                         atomic_dec(&epoch->active);
1069                         break;
1070                 case EV_GOT_BARRIER_NR:
1071                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1072                         break;
1073                 case EV_BECAME_LAST:
1074                         /* nothing to do*/
1075                         break;
1076                 }
1077
1078                 if (epoch_size != 0 &&
1079                     atomic_read(&epoch->active) == 0 &&
1080                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1081                         if (!(ev & EV_CLEANUP)) {
1082                                 spin_unlock(&mdev->epoch_lock);
1083                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1084                                 spin_lock(&mdev->epoch_lock);
1085                         }
1086                         dec_unacked(mdev);
1087
1088                         if (mdev->current_epoch != epoch) {
1089                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1090                                 list_del(&epoch->list);
1091                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1092                                 mdev->epochs--;
1093                                 kfree(epoch);
1094
1095                                 if (rv == FE_STILL_LIVE)
1096                                         rv = FE_DESTROYED;
1097                         } else {
1098                                 epoch->flags = 0;
1099                                 atomic_set(&epoch->epoch_size, 0);
1100                                 /* atomic_set(&epoch->active, 0); is already zero */
1101                                 if (rv == FE_STILL_LIVE)
1102                                         rv = FE_RECYCLED;
1103                                 wake_up(&mdev->ee_wait);
1104                         }
1105                 }
1106
1107                 if (!next_epoch)
1108                         break;
1109
1110                 epoch = next_epoch;
1111         } while (1);
1112
1113         spin_unlock(&mdev->epoch_lock);
1114
1115         return rv;
1116 }
1117
1118 /**
1119  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1120  * @mdev:       DRBD device.
1121  * @wo:         Write ordering method to try.
1122  */
1123 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1124 {
1125         enum write_ordering_e pwo;
1126         static char *write_ordering_str[] = {
1127                 [WO_none] = "none",
1128                 [WO_drain_io] = "drain",
1129                 [WO_bdev_flush] = "flush",
1130         };
1131
1132         pwo = mdev->write_ordering;
1133         wo = min(pwo, wo);
1134         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1135                 wo = WO_drain_io;
1136         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1137                 wo = WO_none;
1138         mdev->write_ordering = wo;
1139         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1140                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1141 }
1142
1143 /**
1144  * drbd_submit_peer_request()
1145  * @mdev:       DRBD device.
1146  * @peer_req:   peer request
1147  * @rw:         flag field, see bio->bi_rw
1148  *
1149  * May spread the pages to multiple bios,
1150  * depending on bio_add_page restrictions.
1151  *
1152  * Returns 0 if all bios have been submitted,
1153  * -ENOMEM if we could not allocate enough bios,
1154  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1155  *  single page to an empty bio (which should never happen and likely indicates
1156  *  that the lower level IO stack is in some way broken). This has been observed
1157  *  on certain Xen deployments.
1158  */
1159 /* TODO allocate from our own bio_set. */
1160 int drbd_submit_peer_request(struct drbd_conf *mdev,
1161                              struct drbd_peer_request *peer_req,
1162                              const unsigned rw, const int fault_type)
1163 {
1164         struct bio *bios = NULL;
1165         struct bio *bio;
1166         struct page *page = peer_req->pages;
1167         sector_t sector = peer_req->i.sector;
1168         unsigned ds = peer_req->i.size;
1169         unsigned n_bios = 0;
1170         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1171         int err = -ENOMEM;
1172
1173         /* In most cases, we will only need one bio.  But in case the lower
1174          * level restrictions happen to be different at this offset on this
1175          * side than those of the sending peer, we may need to submit the
1176          * request in more than one bio.
1177          *
1178          * Plain bio_alloc is good enough here, this is no DRBD internally
1179          * generated bio, but a bio allocated on behalf of the peer.
1180          */
1181 next_bio:
1182         bio = bio_alloc(GFP_NOIO, nr_pages);
1183         if (!bio) {
1184                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1185                 goto fail;
1186         }
1187         /* > peer_req->i.sector, unless this is the first bio */
1188         bio->bi_sector = sector;
1189         bio->bi_bdev = mdev->ldev->backing_bdev;
1190         bio->bi_rw = rw;
1191         bio->bi_private = peer_req;
1192         bio->bi_end_io = drbd_peer_request_endio;
1193
1194         bio->bi_next = bios;
1195         bios = bio;
1196         ++n_bios;
1197
1198         page_chain_for_each(page) {
1199                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1200                 if (!bio_add_page(bio, page, len, 0)) {
1201                         /* A single page must always be possible!
1202                          * But in case it fails anyways,
1203                          * we deal with it, and complain (below). */
1204                         if (bio->bi_vcnt == 0) {
1205                                 dev_err(DEV,
1206                                         "bio_add_page failed for len=%u, "
1207                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1208                                         len, (unsigned long long)bio->bi_sector);
1209                                 err = -ENOSPC;
1210                                 goto fail;
1211                         }
1212                         goto next_bio;
1213                 }
1214                 ds -= len;
1215                 sector += len >> 9;
1216                 --nr_pages;
1217         }
1218         D_ASSERT(page == NULL);
1219         D_ASSERT(ds == 0);
1220
1221         atomic_set(&peer_req->pending_bios, n_bios);
1222         do {
1223                 bio = bios;
1224                 bios = bios->bi_next;
1225                 bio->bi_next = NULL;
1226
1227                 drbd_generic_make_request(mdev, fault_type, bio);
1228         } while (bios);
1229         return 0;
1230
1231 fail:
1232         while (bios) {
1233                 bio = bios;
1234                 bios = bios->bi_next;
1235                 bio_put(bio);
1236         }
1237         return err;
1238 }
1239
1240 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1241                                              struct drbd_peer_request *peer_req)
1242 {
1243         struct drbd_interval *i = &peer_req->i;
1244
1245         drbd_remove_interval(&mdev->write_requests, i);
1246         drbd_clear_interval(i);
1247
1248         /* Wake up any processes waiting for this peer request to complete.  */
1249         if (i->waiting)
1250                 wake_up(&mdev->misc_wait);
1251 }
1252
1253 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1254 {
1255         struct drbd_conf *mdev;
1256         int rv;
1257         struct p_barrier *p = pi->data;
1258         struct drbd_epoch *epoch;
1259
1260         mdev = vnr_to_mdev(tconn, pi->vnr);
1261         if (!mdev)
1262                 return -EIO;
1263
1264         inc_unacked(mdev);
1265
1266         mdev->current_epoch->barrier_nr = p->barrier;
1267         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1268
1269         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1270          * the activity log, which means it would not be resynced in case the
1271          * R_PRIMARY crashes now.
1272          * Therefore we must send the barrier_ack after the barrier request was
1273          * completed. */
1274         switch (mdev->write_ordering) {
1275         case WO_none:
1276                 if (rv == FE_RECYCLED)
1277                         return 0;
1278
1279                 /* receiver context, in the writeout path of the other node.
1280                  * avoid potential distributed deadlock */
1281                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1282                 if (epoch)
1283                         break;
1284                 else
1285                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1286                         /* Fall through */
1287
1288         case WO_bdev_flush:
1289         case WO_drain_io:
1290                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1291                 drbd_flush(mdev);
1292
1293                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1294                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1295                         if (epoch)
1296                                 break;
1297                 }
1298
1299                 epoch = mdev->current_epoch;
1300                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1301
1302                 D_ASSERT(atomic_read(&epoch->active) == 0);
1303                 D_ASSERT(epoch->flags == 0);
1304
1305                 return 0;
1306         default:
1307                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1308                 return -EIO;
1309         }
1310
1311         epoch->flags = 0;
1312         atomic_set(&epoch->epoch_size, 0);
1313         atomic_set(&epoch->active, 0);
1314
1315         spin_lock(&mdev->epoch_lock);
1316         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1317                 list_add(&epoch->list, &mdev->current_epoch->list);
1318                 mdev->current_epoch = epoch;
1319                 mdev->epochs++;
1320         } else {
1321                 /* The current_epoch got recycled while we allocated this one... */
1322                 kfree(epoch);
1323         }
1324         spin_unlock(&mdev->epoch_lock);
1325
1326         return 0;
1327 }
1328
1329 /* used from receive_RSDataReply (recv_resync_read)
1330  * and from receive_Data */
1331 static struct drbd_peer_request *
1332 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1333               int data_size) __must_hold(local)
1334 {
1335         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1336         struct drbd_peer_request *peer_req;
1337         struct page *page;
1338         int dgs, ds, err;
1339         void *dig_in = mdev->tconn->int_dig_in;
1340         void *dig_vv = mdev->tconn->int_dig_vv;
1341         unsigned long *data;
1342
1343         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1344                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1345
1346         if (dgs) {
1347                 /*
1348                  * FIXME: Receive the incoming digest into the receive buffer
1349                  *        here, together with its struct p_data?
1350                  */
1351                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1352                 if (err)
1353                         return NULL;
1354         }
1355
1356         data_size -= dgs;
1357
1358         if (!expect(data_size != 0))
1359                 return NULL;
1360         if (!expect(IS_ALIGNED(data_size, 512)))
1361                 return NULL;
1362         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1363                 return NULL;
1364
1365         /* even though we trust out peer,
1366          * we sometimes have to double check. */
1367         if (sector + (data_size>>9) > capacity) {
1368                 dev_err(DEV, "request from peer beyond end of local disk: "
1369                         "capacity: %llus < sector: %llus + size: %u\n",
1370                         (unsigned long long)capacity,
1371                         (unsigned long long)sector, data_size);
1372                 return NULL;
1373         }
1374
1375         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1376          * "criss-cross" setup, that might cause write-out on some other DRBD,
1377          * which in turn might block on the other node at this very place.  */
1378         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1379         if (!peer_req)
1380                 return NULL;
1381
1382         ds = data_size;
1383         page = peer_req->pages;
1384         page_chain_for_each(page) {
1385                 unsigned len = min_t(int, ds, PAGE_SIZE);
1386                 data = kmap(page);
1387                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1388                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1389                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1390                         data[0] = data[0] ^ (unsigned long)-1;
1391                 }
1392                 kunmap(page);
1393                 if (err) {
1394                         drbd_free_peer_req(mdev, peer_req);
1395                         return NULL;
1396                 }
1397                 ds -= len;
1398         }
1399
1400         if (dgs) {
1401                 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1402                 if (memcmp(dig_in, dig_vv, dgs)) {
1403                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1404                                 (unsigned long long)sector, data_size);
1405                         drbd_free_peer_req(mdev, peer_req);
1406                         return NULL;
1407                 }
1408         }
1409         mdev->recv_cnt += data_size>>9;
1410         return peer_req;
1411 }
1412
1413 /* drbd_drain_block() just takes a data block
1414  * out of the socket input buffer, and discards it.
1415  */
1416 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1417 {
1418         struct page *page;
1419         int err = 0;
1420         void *data;
1421
1422         if (!data_size)
1423                 return 0;
1424
1425         page = drbd_pp_alloc(mdev, 1, 1);
1426
1427         data = kmap(page);
1428         while (data_size) {
1429                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1430
1431                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1432                 if (err)
1433                         break;
1434                 data_size -= len;
1435         }
1436         kunmap(page);
1437         drbd_pp_free(mdev, page, 0);
1438         return err;
1439 }
1440
1441 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1442                            sector_t sector, int data_size)
1443 {
1444         struct bio_vec *bvec;
1445         struct bio *bio;
1446         int dgs, err, i, expect;
1447         void *dig_in = mdev->tconn->int_dig_in;
1448         void *dig_vv = mdev->tconn->int_dig_vv;
1449
1450         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1451                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1452
1453         if (dgs) {
1454                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1455                 if (err)
1456                         return err;
1457         }
1458
1459         data_size -= dgs;
1460
1461         /* optimistically update recv_cnt.  if receiving fails below,
1462          * we disconnect anyways, and counters will be reset. */
1463         mdev->recv_cnt += data_size>>9;
1464
1465         bio = req->master_bio;
1466         D_ASSERT(sector == bio->bi_sector);
1467
1468         bio_for_each_segment(bvec, bio, i) {
1469                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1470                 expect = min_t(int, data_size, bvec->bv_len);
1471                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1472                 kunmap(bvec->bv_page);
1473                 if (err)
1474                         return err;
1475                 data_size -= expect;
1476         }
1477
1478         if (dgs) {
1479                 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1480                 if (memcmp(dig_in, dig_vv, dgs)) {
1481                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1482                         return -EINVAL;
1483                 }
1484         }
1485
1486         D_ASSERT(data_size == 0);
1487         return 0;
1488 }
1489
1490 /*
1491  * e_end_resync_block() is called in asender context via
1492  * drbd_finish_peer_reqs().
1493  */
1494 static int e_end_resync_block(struct drbd_work *w, int unused)
1495 {
1496         struct drbd_peer_request *peer_req =
1497                 container_of(w, struct drbd_peer_request, w);
1498         struct drbd_conf *mdev = w->mdev;
1499         sector_t sector = peer_req->i.sector;
1500         int err;
1501
1502         D_ASSERT(drbd_interval_empty(&peer_req->i));
1503
1504         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1505                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1506                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1507         } else {
1508                 /* Record failure to sync */
1509                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1510
1511                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1512         }
1513         dec_unacked(mdev);
1514
1515         return err;
1516 }
1517
1518 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1519 {
1520         struct drbd_peer_request *peer_req;
1521
1522         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1523         if (!peer_req)
1524                 goto fail;
1525
1526         dec_rs_pending(mdev);
1527
1528         inc_unacked(mdev);
1529         /* corresponding dec_unacked() in e_end_resync_block()
1530          * respective _drbd_clear_done_ee */
1531
1532         peer_req->w.cb = e_end_resync_block;
1533
1534         spin_lock_irq(&mdev->tconn->req_lock);
1535         list_add(&peer_req->w.list, &mdev->sync_ee);
1536         spin_unlock_irq(&mdev->tconn->req_lock);
1537
1538         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1539         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1540                 return 0;
1541
1542         /* don't care for the reason here */
1543         dev_err(DEV, "submit failed, triggering re-connect\n");
1544         spin_lock_irq(&mdev->tconn->req_lock);
1545         list_del(&peer_req->w.list);
1546         spin_unlock_irq(&mdev->tconn->req_lock);
1547
1548         drbd_free_peer_req(mdev, peer_req);
1549 fail:
1550         put_ldev(mdev);
1551         return -EIO;
1552 }
1553
1554 static struct drbd_request *
1555 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1556              sector_t sector, bool missing_ok, const char *func)
1557 {
1558         struct drbd_request *req;
1559
1560         /* Request object according to our peer */
1561         req = (struct drbd_request *)(unsigned long)id;
1562         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1563                 return req;
1564         if (!missing_ok) {
1565                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1566                         (unsigned long)id, (unsigned long long)sector);
1567         }
1568         return NULL;
1569 }
1570
1571 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1572 {
1573         struct drbd_conf *mdev;
1574         struct drbd_request *req;
1575         sector_t sector;
1576         int err;
1577         struct p_data *p = pi->data;
1578
1579         mdev = vnr_to_mdev(tconn, pi->vnr);
1580         if (!mdev)
1581                 return -EIO;
1582
1583         sector = be64_to_cpu(p->sector);
1584
1585         spin_lock_irq(&mdev->tconn->req_lock);
1586         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1587         spin_unlock_irq(&mdev->tconn->req_lock);
1588         if (unlikely(!req))
1589                 return -EIO;
1590
1591         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1592          * special casing it there for the various failure cases.
1593          * still no race with drbd_fail_pending_reads */
1594         err = recv_dless_read(mdev, req, sector, pi->size);
1595         if (!err)
1596                 req_mod(req, DATA_RECEIVED);
1597         /* else: nothing. handled from drbd_disconnect...
1598          * I don't think we may complete this just yet
1599          * in case we are "on-disconnect: freeze" */
1600
1601         return err;
1602 }
1603
1604 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1605 {
1606         struct drbd_conf *mdev;
1607         sector_t sector;
1608         int err;
1609         struct p_data *p = pi->data;
1610
1611         mdev = vnr_to_mdev(tconn, pi->vnr);
1612         if (!mdev)
1613                 return -EIO;
1614
1615         sector = be64_to_cpu(p->sector);
1616         D_ASSERT(p->block_id == ID_SYNCER);
1617
1618         if (get_ldev(mdev)) {
1619                 /* data is submitted to disk within recv_resync_read.
1620                  * corresponding put_ldev done below on error,
1621                  * or in drbd_peer_request_endio. */
1622                 err = recv_resync_read(mdev, sector, pi->size);
1623         } else {
1624                 if (__ratelimit(&drbd_ratelimit_state))
1625                         dev_err(DEV, "Can not write resync data to local disk.\n");
1626
1627                 err = drbd_drain_block(mdev, pi->size);
1628
1629                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1630         }
1631
1632         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1633
1634         return err;
1635 }
1636
1637 static int w_restart_write(struct drbd_work *w, int cancel)
1638 {
1639         struct drbd_request *req = container_of(w, struct drbd_request, w);
1640         struct drbd_conf *mdev = w->mdev;
1641         struct bio *bio;
1642         unsigned long start_time;
1643         unsigned long flags;
1644
1645         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1646         if (!expect(req->rq_state & RQ_POSTPONED)) {
1647                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1648                 return -EIO;
1649         }
1650         bio = req->master_bio;
1651         start_time = req->start_time;
1652         /* Postponed requests will not have their master_bio completed!  */
1653         __req_mod(req, DISCARD_WRITE, NULL);
1654         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1655
1656         while (__drbd_make_request(mdev, bio, start_time))
1657                 /* retry */ ;
1658         return 0;
1659 }
1660
1661 static void restart_conflicting_writes(struct drbd_conf *mdev,
1662                                        sector_t sector, int size)
1663 {
1664         struct drbd_interval *i;
1665         struct drbd_request *req;
1666
1667         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1668                 if (!i->local)
1669                         continue;
1670                 req = container_of(i, struct drbd_request, i);
1671                 if (req->rq_state & RQ_LOCAL_PENDING ||
1672                     !(req->rq_state & RQ_POSTPONED))
1673                         continue;
1674                 if (expect(list_empty(&req->w.list))) {
1675                         req->w.mdev = mdev;
1676                         req->w.cb = w_restart_write;
1677                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1678                 }
1679         }
1680 }
1681
1682 /*
1683  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1684  */
1685 static int e_end_block(struct drbd_work *w, int cancel)
1686 {
1687         struct drbd_peer_request *peer_req =
1688                 container_of(w, struct drbd_peer_request, w);
1689         struct drbd_conf *mdev = w->mdev;
1690         sector_t sector = peer_req->i.sector;
1691         int err = 0, pcmd;
1692
1693         if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
1694                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1695                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1696                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1697                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1698                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1699                         err = drbd_send_ack(mdev, pcmd, peer_req);
1700                         if (pcmd == P_RS_WRITE_ACK)
1701                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1702                 } else {
1703                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1704                         /* we expect it to be marked out of sync anyways...
1705                          * maybe assert this?  */
1706                 }
1707                 dec_unacked(mdev);
1708         }
1709         /* we delete from the conflict detection hash _after_ we sent out the
1710          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1711         if (mdev->tconn->net_conf->two_primaries) {
1712                 spin_lock_irq(&mdev->tconn->req_lock);
1713                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1714                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1715                 if (peer_req->flags & EE_RESTART_REQUESTS)
1716                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1717                 spin_unlock_irq(&mdev->tconn->req_lock);
1718         } else
1719                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1720
1721         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1722
1723         return err;
1724 }
1725
1726 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1727 {
1728         struct drbd_conf *mdev = w->mdev;
1729         struct drbd_peer_request *peer_req =
1730                 container_of(w, struct drbd_peer_request, w);
1731         int err;
1732
1733         err = drbd_send_ack(mdev, ack, peer_req);
1734         dec_unacked(mdev);
1735
1736         return err;
1737 }
1738
1739 static int e_send_discard_write(struct drbd_work *w, int unused)
1740 {
1741         return e_send_ack(w, P_DISCARD_WRITE);
1742 }
1743
1744 static int e_send_retry_write(struct drbd_work *w, int unused)
1745 {
1746         struct drbd_tconn *tconn = w->mdev->tconn;
1747
1748         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1749                              P_RETRY_WRITE : P_DISCARD_WRITE);
1750 }
1751
1752 static bool seq_greater(u32 a, u32 b)
1753 {
1754         /*
1755          * We assume 32-bit wrap-around here.
1756          * For 24-bit wrap-around, we would have to shift:
1757          *  a <<= 8; b <<= 8;
1758          */
1759         return (s32)a - (s32)b > 0;
1760 }
1761
1762 static u32 seq_max(u32 a, u32 b)
1763 {
1764         return seq_greater(a, b) ? a : b;
1765 }
1766
1767 static bool need_peer_seq(struct drbd_conf *mdev)
1768 {
1769         struct drbd_tconn *tconn = mdev->tconn;
1770
1771         /*
1772          * We only need to keep track of the last packet_seq number of our peer
1773          * if we are in dual-primary mode and we have the discard flag set; see
1774          * handle_write_conflicts().
1775          */
1776         return tconn->net_conf->two_primaries &&
1777                test_bit(DISCARD_CONCURRENT, &tconn->flags);
1778 }
1779
1780 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1781 {
1782         unsigned int newest_peer_seq;
1783
1784         if (need_peer_seq(mdev)) {
1785                 spin_lock(&mdev->peer_seq_lock);
1786                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1787                 mdev->peer_seq = newest_peer_seq;
1788                 spin_unlock(&mdev->peer_seq_lock);
1789                 /* wake up only if we actually changed mdev->peer_seq */
1790                 if (peer_seq == newest_peer_seq)
1791                         wake_up(&mdev->seq_wait);
1792         }
1793 }
1794
1795 /* Called from receive_Data.
1796  * Synchronize packets on sock with packets on msock.
1797  *
1798  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1799  * packet traveling on msock, they are still processed in the order they have
1800  * been sent.
1801  *
1802  * Note: we don't care for Ack packets overtaking P_DATA packets.
1803  *
1804  * In case packet_seq is larger than mdev->peer_seq number, there are
1805  * outstanding packets on the msock. We wait for them to arrive.
1806  * In case we are the logically next packet, we update mdev->peer_seq
1807  * ourselves. Correctly handles 32bit wrap around.
1808  *
1809  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1810  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1811  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1812  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1813  *
1814  * returns 0 if we may process the packet,
1815  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1816 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1817 {
1818         DEFINE_WAIT(wait);
1819         long timeout;
1820         int ret;
1821
1822         if (!need_peer_seq(mdev))
1823                 return 0;
1824
1825         spin_lock(&mdev->peer_seq_lock);
1826         for (;;) {
1827                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1828                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1829                         ret = 0;
1830                         break;
1831                 }
1832                 if (signal_pending(current)) {
1833                         ret = -ERESTARTSYS;
1834                         break;
1835                 }
1836                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1837                 spin_unlock(&mdev->peer_seq_lock);
1838                 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1839                 timeout = schedule_timeout(timeout);
1840                 spin_lock(&mdev->peer_seq_lock);
1841                 if (!timeout) {
1842                         ret = -ETIMEDOUT;
1843                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1844                         break;
1845                 }
1846         }
1847         spin_unlock(&mdev->peer_seq_lock);
1848         finish_wait(&mdev->seq_wait, &wait);
1849         return ret;
1850 }
1851
1852 /* see also bio_flags_to_wire()
1853  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1854  * flags and back. We may replicate to other kernel versions. */
1855 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1856 {
1857         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1858                 (dpf & DP_FUA ? REQ_FUA : 0) |
1859                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1860                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1861 }
1862
1863 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1864                                     unsigned int size)
1865 {
1866         struct drbd_interval *i;
1867
1868     repeat:
1869         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1870                 struct drbd_request *req;
1871                 struct bio_and_error m;
1872
1873                 if (!i->local)
1874                         continue;
1875                 req = container_of(i, struct drbd_request, i);
1876                 if (!(req->rq_state & RQ_POSTPONED))
1877                         continue;
1878                 req->rq_state &= ~RQ_POSTPONED;
1879                 __req_mod(req, NEG_ACKED, &m);
1880                 spin_unlock_irq(&mdev->tconn->req_lock);
1881                 if (m.bio)
1882                         complete_master_bio(mdev, &m);
1883                 spin_lock_irq(&mdev->tconn->req_lock);
1884                 goto repeat;
1885         }
1886 }
1887
1888 static int handle_write_conflicts(struct drbd_conf *mdev,
1889                                   struct drbd_peer_request *peer_req)
1890 {
1891         struct drbd_tconn *tconn = mdev->tconn;
1892         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1893         sector_t sector = peer_req->i.sector;
1894         const unsigned int size = peer_req->i.size;
1895         struct drbd_interval *i;
1896         bool equal;
1897         int err;
1898
1899         /*
1900          * Inserting the peer request into the write_requests tree will prevent
1901          * new conflicting local requests from being added.
1902          */
1903         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1904
1905     repeat:
1906         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1907                 if (i == &peer_req->i)
1908                         continue;
1909
1910                 if (!i->local) {
1911                         /*
1912                          * Our peer has sent a conflicting remote request; this
1913                          * should not happen in a two-node setup.  Wait for the
1914                          * earlier peer request to complete.
1915                          */
1916                         err = drbd_wait_misc(mdev, i);
1917                         if (err)
1918                                 goto out;
1919                         goto repeat;
1920                 }
1921
1922                 equal = i->sector == sector && i->size == size;
1923                 if (resolve_conflicts) {
1924                         /*
1925                          * If the peer request is fully contained within the
1926                          * overlapping request, it can be discarded; otherwise,
1927                          * it will be retried once all overlapping requests
1928                          * have completed.
1929                          */
1930                         bool discard = i->sector <= sector && i->sector +
1931                                        (i->size >> 9) >= sector + (size >> 9);
1932
1933                         if (!equal)
1934                                 dev_alert(DEV, "Concurrent writes detected: "
1935                                                "local=%llus +%u, remote=%llus +%u, "
1936                                                "assuming %s came first\n",
1937                                           (unsigned long long)i->sector, i->size,
1938                                           (unsigned long long)sector, size,
1939                                           discard ? "local" : "remote");
1940
1941                         inc_unacked(mdev);
1942                         peer_req->w.cb = discard ? e_send_discard_write :
1943                                                    e_send_retry_write;
1944                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
1945                         wake_asender(mdev->tconn);
1946
1947                         err = -ENOENT;
1948                         goto out;
1949                 } else {
1950                         struct drbd_request *req =
1951                                 container_of(i, struct drbd_request, i);
1952
1953                         if (!equal)
1954                                 dev_alert(DEV, "Concurrent writes detected: "
1955                                                "local=%llus +%u, remote=%llus +%u\n",
1956                                           (unsigned long long)i->sector, i->size,
1957                                           (unsigned long long)sector, size);
1958
1959                         if (req->rq_state & RQ_LOCAL_PENDING ||
1960                             !(req->rq_state & RQ_POSTPONED)) {
1961                                 /*
1962                                  * Wait for the node with the discard flag to
1963                                  * decide if this request will be discarded or
1964                                  * retried.  Requests that are discarded will
1965                                  * disappear from the write_requests tree.
1966                                  *
1967                                  * In addition, wait for the conflicting
1968                                  * request to finish locally before submitting
1969                                  * the conflicting peer request.
1970                                  */
1971                                 err = drbd_wait_misc(mdev, &req->i);
1972                                 if (err) {
1973                                         _conn_request_state(mdev->tconn,
1974                                                             NS(conn, C_TIMEOUT),
1975                                                             CS_HARD);
1976                                         fail_postponed_requests(mdev, sector, size);
1977                                         goto out;
1978                                 }
1979                                 goto repeat;
1980                         }
1981                         /*
1982                          * Remember to restart the conflicting requests after
1983                          * the new peer request has completed.
1984                          */
1985                         peer_req->flags |= EE_RESTART_REQUESTS;
1986                 }
1987         }
1988         err = 0;
1989
1990     out:
1991         if (err)
1992                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1993         return err;
1994 }
1995
1996 /* mirrored write */
1997 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
1998 {
1999         struct drbd_conf *mdev;
2000         sector_t sector;
2001         struct drbd_peer_request *peer_req;
2002         struct p_data *p = pi->data;
2003         u32 peer_seq = be32_to_cpu(p->seq_num);
2004         int rw = WRITE;
2005         u32 dp_flags;
2006         int err;
2007
2008         mdev = vnr_to_mdev(tconn, pi->vnr);
2009         if (!mdev)
2010                 return -EIO;
2011
2012         if (!get_ldev(mdev)) {
2013                 int err2;
2014
2015                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2016                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2017                 atomic_inc(&mdev->current_epoch->epoch_size);
2018                 err2 = drbd_drain_block(mdev, pi->size);
2019                 if (!err)
2020                         err = err2;
2021                 return err;
2022         }
2023
2024         /*
2025          * Corresponding put_ldev done either below (on various errors), or in
2026          * drbd_peer_request_endio, if we successfully submit the data at the
2027          * end of this function.
2028          */
2029
2030         sector = be64_to_cpu(p->sector);
2031         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2032         if (!peer_req) {
2033                 put_ldev(mdev);
2034                 return -EIO;
2035         }
2036
2037         peer_req->w.cb = e_end_block;
2038
2039         dp_flags = be32_to_cpu(p->dp_flags);
2040         rw |= wire_flags_to_bio(mdev, dp_flags);
2041
2042         if (dp_flags & DP_MAY_SET_IN_SYNC)
2043                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2044
2045         spin_lock(&mdev->epoch_lock);
2046         peer_req->epoch = mdev->current_epoch;
2047         atomic_inc(&peer_req->epoch->epoch_size);
2048         atomic_inc(&peer_req->epoch->active);
2049         spin_unlock(&mdev->epoch_lock);
2050
2051         if (mdev->tconn->net_conf->two_primaries) {
2052                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2053                 if (err)
2054                         goto out_interrupted;
2055                 spin_lock_irq(&mdev->tconn->req_lock);
2056                 err = handle_write_conflicts(mdev, peer_req);
2057                 if (err) {
2058                         spin_unlock_irq(&mdev->tconn->req_lock);
2059                         if (err == -ENOENT) {
2060                                 put_ldev(mdev);
2061                                 return 0;
2062                         }
2063                         goto out_interrupted;
2064                 }
2065         } else
2066                 spin_lock_irq(&mdev->tconn->req_lock);
2067         list_add(&peer_req->w.list, &mdev->active_ee);
2068         spin_unlock_irq(&mdev->tconn->req_lock);
2069
2070         switch (mdev->tconn->net_conf->wire_protocol) {
2071         case DRBD_PROT_C:
2072                 inc_unacked(mdev);
2073                 /* corresponding dec_unacked() in e_end_block()
2074                  * respective _drbd_clear_done_ee */
2075                 break;
2076         case DRBD_PROT_B:
2077                 /* I really don't like it that the receiver thread
2078                  * sends on the msock, but anyways */
2079                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2080                 break;
2081         case DRBD_PROT_A:
2082                 /* nothing to do */
2083                 break;
2084         }
2085
2086         if (mdev->state.pdsk < D_INCONSISTENT) {
2087                 /* In case we have the only disk of the cluster, */
2088                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2089                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2090                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2091                 drbd_al_begin_io(mdev, &peer_req->i);
2092         }
2093
2094         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2095         if (!err)
2096                 return 0;
2097
2098         /* don't care for the reason here */
2099         dev_err(DEV, "submit failed, triggering re-connect\n");
2100         spin_lock_irq(&mdev->tconn->req_lock);
2101         list_del(&peer_req->w.list);
2102         drbd_remove_epoch_entry_interval(mdev, peer_req);
2103         spin_unlock_irq(&mdev->tconn->req_lock);
2104         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2105                 drbd_al_complete_io(mdev, &peer_req->i);
2106
2107 out_interrupted:
2108         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2109         put_ldev(mdev);
2110         drbd_free_peer_req(mdev, peer_req);
2111         return err;
2112 }
2113
2114 /* We may throttle resync, if the lower device seems to be busy,
2115  * and current sync rate is above c_min_rate.
2116  *
2117  * To decide whether or not the lower device is busy, we use a scheme similar
2118  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2119  * (more than 64 sectors) of activity we cannot account for with our own resync
2120  * activity, it obviously is "busy".
2121  *
2122  * The current sync rate used here uses only the most recent two step marks,
2123  * to have a short time average so we can react faster.
2124  */
2125 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2126 {
2127         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2128         unsigned long db, dt, dbdt;
2129         struct lc_element *tmp;
2130         int curr_events;
2131         int throttle = 0;
2132
2133         /* feature disabled? */
2134         if (mdev->ldev->dc.c_min_rate == 0)
2135                 return 0;
2136
2137         spin_lock_irq(&mdev->al_lock);
2138         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2139         if (tmp) {
2140                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2141                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2142                         spin_unlock_irq(&mdev->al_lock);
2143                         return 0;
2144                 }
2145                 /* Do not slow down if app IO is already waiting for this extent */
2146         }
2147         spin_unlock_irq(&mdev->al_lock);
2148
2149         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2150                       (int)part_stat_read(&disk->part0, sectors[1]) -
2151                         atomic_read(&mdev->rs_sect_ev);
2152
2153         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2154                 unsigned long rs_left;
2155                 int i;
2156
2157                 mdev->rs_last_events = curr_events;
2158
2159                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2160                  * approx. */
2161                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2162
2163                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2164                         rs_left = mdev->ov_left;
2165                 else
2166                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2167
2168                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2169                 if (!dt)
2170                         dt++;
2171                 db = mdev->rs_mark_left[i] - rs_left;
2172                 dbdt = Bit2KB(db/dt);
2173
2174                 if (dbdt > mdev->ldev->dc.c_min_rate)
2175                         throttle = 1;
2176         }
2177         return throttle;
2178 }
2179
2180
2181 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2182 {
2183         struct drbd_conf *mdev;
2184         sector_t sector;
2185         sector_t capacity;
2186         struct drbd_peer_request *peer_req;
2187         struct digest_info *di = NULL;
2188         int size, verb;
2189         unsigned int fault_type;
2190         struct p_block_req *p = pi->data;
2191
2192         mdev = vnr_to_mdev(tconn, pi->vnr);
2193         if (!mdev)
2194                 return -EIO;
2195         capacity = drbd_get_capacity(mdev->this_bdev);
2196
2197         sector = be64_to_cpu(p->sector);
2198         size   = be32_to_cpu(p->blksize);
2199
2200         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2201                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2202                                 (unsigned long long)sector, size);
2203                 return -EINVAL;
2204         }
2205         if (sector + (size>>9) > capacity) {
2206                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2207                                 (unsigned long long)sector, size);
2208                 return -EINVAL;
2209         }
2210
2211         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2212                 verb = 1;
2213                 switch (pi->cmd) {
2214                 case P_DATA_REQUEST:
2215                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2216                         break;
2217                 case P_RS_DATA_REQUEST:
2218                 case P_CSUM_RS_REQUEST:
2219                 case P_OV_REQUEST:
2220                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2221                         break;
2222                 case P_OV_REPLY:
2223                         verb = 0;
2224                         dec_rs_pending(mdev);
2225                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2226                         break;
2227                 default:
2228                         BUG();
2229                 }
2230                 if (verb && __ratelimit(&drbd_ratelimit_state))
2231                         dev_err(DEV, "Can not satisfy peer's read request, "
2232                             "no local data.\n");
2233
2234                 /* drain possibly payload */
2235                 return drbd_drain_block(mdev, pi->size);
2236         }
2237
2238         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2239          * "criss-cross" setup, that might cause write-out on some other DRBD,
2240          * which in turn might block on the other node at this very place.  */
2241         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2242         if (!peer_req) {
2243                 put_ldev(mdev);
2244                 return -ENOMEM;
2245         }
2246
2247         switch (pi->cmd) {
2248         case P_DATA_REQUEST:
2249                 peer_req->w.cb = w_e_end_data_req;
2250                 fault_type = DRBD_FAULT_DT_RD;
2251                 /* application IO, don't drbd_rs_begin_io */
2252                 goto submit;
2253
2254         case P_RS_DATA_REQUEST:
2255                 peer_req->w.cb = w_e_end_rsdata_req;
2256                 fault_type = DRBD_FAULT_RS_RD;
2257                 /* used in the sector offset progress display */
2258                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2259                 break;
2260
2261         case P_OV_REPLY:
2262         case P_CSUM_RS_REQUEST:
2263                 fault_type = DRBD_FAULT_RS_RD;
2264                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2265                 if (!di)
2266                         goto out_free_e;
2267
2268                 di->digest_size = pi->size;
2269                 di->digest = (((char *)di)+sizeof(struct digest_info));
2270
2271                 peer_req->digest = di;
2272                 peer_req->flags |= EE_HAS_DIGEST;
2273
2274                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2275                         goto out_free_e;
2276
2277                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2278                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2279                         peer_req->w.cb = w_e_end_csum_rs_req;
2280                         /* used in the sector offset progress display */
2281                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2282                 } else if (pi->cmd == P_OV_REPLY) {
2283                         /* track progress, we may need to throttle */
2284                         atomic_add(size >> 9, &mdev->rs_sect_in);
2285                         peer_req->w.cb = w_e_end_ov_reply;
2286                         dec_rs_pending(mdev);
2287                         /* drbd_rs_begin_io done when we sent this request,
2288                          * but accounting still needs to be done. */
2289                         goto submit_for_resync;
2290                 }
2291                 break;
2292
2293         case P_OV_REQUEST:
2294                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2295                     mdev->tconn->agreed_pro_version >= 90) {
2296                         unsigned long now = jiffies;
2297                         int i;
2298                         mdev->ov_start_sector = sector;
2299                         mdev->ov_position = sector;
2300                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2301                         mdev->rs_total = mdev->ov_left;
2302                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2303                                 mdev->rs_mark_left[i] = mdev->ov_left;
2304                                 mdev->rs_mark_time[i] = now;
2305                         }
2306                         dev_info(DEV, "Online Verify start sector: %llu\n",
2307                                         (unsigned long long)sector);
2308                 }
2309                 peer_req->w.cb = w_e_end_ov_req;
2310                 fault_type = DRBD_FAULT_RS_RD;
2311                 break;
2312
2313         default:
2314                 BUG();
2315         }
2316
2317         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2318          * wrt the receiver, but it is not as straightforward as it may seem.
2319          * Various places in the resync start and stop logic assume resync
2320          * requests are processed in order, requeuing this on the worker thread
2321          * introduces a bunch of new code for synchronization between threads.
2322          *
2323          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2324          * "forever", throttling after drbd_rs_begin_io will lock that extent
2325          * for application writes for the same time.  For now, just throttle
2326          * here, where the rest of the code expects the receiver to sleep for
2327          * a while, anyways.
2328          */
2329
2330         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2331          * this defers syncer requests for some time, before letting at least
2332          * on request through.  The resync controller on the receiving side
2333          * will adapt to the incoming rate accordingly.
2334          *
2335          * We cannot throttle here if remote is Primary/SyncTarget:
2336          * we would also throttle its application reads.
2337          * In that case, throttling is done on the SyncTarget only.
2338          */
2339         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2340                 schedule_timeout_uninterruptible(HZ/10);
2341         if (drbd_rs_begin_io(mdev, sector))
2342                 goto out_free_e;
2343
2344 submit_for_resync:
2345         atomic_add(size >> 9, &mdev->rs_sect_ev);
2346
2347 submit:
2348         inc_unacked(mdev);
2349         spin_lock_irq(&mdev->tconn->req_lock);
2350         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2351         spin_unlock_irq(&mdev->tconn->req_lock);
2352
2353         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2354                 return 0;
2355
2356         /* don't care for the reason here */
2357         dev_err(DEV, "submit failed, triggering re-connect\n");
2358         spin_lock_irq(&mdev->tconn->req_lock);
2359         list_del(&peer_req->w.list);
2360         spin_unlock_irq(&mdev->tconn->req_lock);
2361         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2362
2363 out_free_e:
2364         put_ldev(mdev);
2365         drbd_free_peer_req(mdev, peer_req);
2366         return -EIO;
2367 }
2368
2369 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2370 {
2371         int self, peer, rv = -100;
2372         unsigned long ch_self, ch_peer;
2373
2374         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2375         peer = mdev->p_uuid[UI_BITMAP] & 1;
2376
2377         ch_peer = mdev->p_uuid[UI_SIZE];
2378         ch_self = mdev->comm_bm_set;
2379
2380         switch (mdev->tconn->net_conf->after_sb_0p) {
2381         case ASB_CONSENSUS:
2382         case ASB_DISCARD_SECONDARY:
2383         case ASB_CALL_HELPER:
2384                 dev_err(DEV, "Configuration error.\n");
2385                 break;
2386         case ASB_DISCONNECT:
2387                 break;
2388         case ASB_DISCARD_YOUNGER_PRI:
2389                 if (self == 0 && peer == 1) {
2390                         rv = -1;
2391                         break;
2392                 }
2393                 if (self == 1 && peer == 0) {
2394                         rv =  1;
2395                         break;
2396                 }
2397                 /* Else fall through to one of the other strategies... */
2398         case ASB_DISCARD_OLDER_PRI:
2399                 if (self == 0 && peer == 1) {
2400                         rv = 1;
2401                         break;
2402                 }
2403                 if (self == 1 && peer == 0) {
2404                         rv = -1;
2405                         break;
2406                 }
2407                 /* Else fall through to one of the other strategies... */
2408                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2409                      "Using discard-least-changes instead\n");
2410         case ASB_DISCARD_ZERO_CHG:
2411                 if (ch_peer == 0 && ch_self == 0) {
2412                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2413                                 ? -1 : 1;
2414                         break;
2415                 } else {
2416                         if (ch_peer == 0) { rv =  1; break; }
2417                         if (ch_self == 0) { rv = -1; break; }
2418                 }
2419                 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2420                         break;
2421         case ASB_DISCARD_LEAST_CHG:
2422                 if      (ch_self < ch_peer)
2423                         rv = -1;
2424                 else if (ch_self > ch_peer)
2425                         rv =  1;
2426                 else /* ( ch_self == ch_peer ) */
2427                      /* Well, then use something else. */
2428                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2429                                 ? -1 : 1;
2430                 break;
2431         case ASB_DISCARD_LOCAL:
2432                 rv = -1;
2433                 break;
2434         case ASB_DISCARD_REMOTE:
2435                 rv =  1;
2436         }
2437
2438         return rv;
2439 }
2440
2441 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2442 {
2443         int hg, rv = -100;
2444
2445         switch (mdev->tconn->net_conf->after_sb_1p) {
2446         case ASB_DISCARD_YOUNGER_PRI:
2447         case ASB_DISCARD_OLDER_PRI:
2448         case ASB_DISCARD_LEAST_CHG:
2449         case ASB_DISCARD_LOCAL:
2450         case ASB_DISCARD_REMOTE:
2451                 dev_err(DEV, "Configuration error.\n");
2452                 break;
2453         case ASB_DISCONNECT:
2454                 break;
2455         case ASB_CONSENSUS:
2456                 hg = drbd_asb_recover_0p(mdev);
2457                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2458                         rv = hg;
2459                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2460                         rv = hg;
2461                 break;
2462         case ASB_VIOLENTLY:
2463                 rv = drbd_asb_recover_0p(mdev);
2464                 break;
2465         case ASB_DISCARD_SECONDARY:
2466                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2467         case ASB_CALL_HELPER:
2468                 hg = drbd_asb_recover_0p(mdev);
2469                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2470                         enum drbd_state_rv rv2;
2471
2472                         drbd_set_role(mdev, R_SECONDARY, 0);
2473                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2474                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2475                           * we do not need to wait for the after state change work either. */
2476                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2477                         if (rv2 != SS_SUCCESS) {
2478                                 drbd_khelper(mdev, "pri-lost-after-sb");
2479                         } else {
2480                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2481                                 rv = hg;
2482                         }
2483                 } else
2484                         rv = hg;
2485         }
2486
2487         return rv;
2488 }
2489
2490 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2491 {
2492         int hg, rv = -100;
2493
2494         switch (mdev->tconn->net_conf->after_sb_2p) {
2495         case ASB_DISCARD_YOUNGER_PRI:
2496         case ASB_DISCARD_OLDER_PRI:
2497         case ASB_DISCARD_LEAST_CHG:
2498         case ASB_DISCARD_LOCAL:
2499         case ASB_DISCARD_REMOTE:
2500         case ASB_CONSENSUS:
2501         case ASB_DISCARD_SECONDARY:
2502                 dev_err(DEV, "Configuration error.\n");
2503                 break;
2504         case ASB_VIOLENTLY:
2505                 rv = drbd_asb_recover_0p(mdev);
2506                 break;
2507         case ASB_DISCONNECT:
2508                 break;
2509         case ASB_CALL_HELPER:
2510                 hg = drbd_asb_recover_0p(mdev);
2511                 if (hg == -1) {
2512                         enum drbd_state_rv rv2;
2513
2514                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2515                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2516                           * we do not need to wait for the after state change work either. */
2517                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2518                         if (rv2 != SS_SUCCESS) {
2519                                 drbd_khelper(mdev, "pri-lost-after-sb");
2520                         } else {
2521                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2522                                 rv = hg;
2523                         }
2524                 } else
2525                         rv = hg;
2526         }
2527
2528         return rv;
2529 }
2530
2531 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2532                            u64 bits, u64 flags)
2533 {
2534         if (!uuid) {
2535                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2536                 return;
2537         }
2538         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2539              text,
2540              (unsigned long long)uuid[UI_CURRENT],
2541              (unsigned long long)uuid[UI_BITMAP],
2542              (unsigned long long)uuid[UI_HISTORY_START],
2543              (unsigned long long)uuid[UI_HISTORY_END],
2544              (unsigned long long)bits,
2545              (unsigned long long)flags);
2546 }
2547
2548 /*
2549   100   after split brain try auto recover
2550     2   C_SYNC_SOURCE set BitMap
2551     1   C_SYNC_SOURCE use BitMap
2552     0   no Sync
2553    -1   C_SYNC_TARGET use BitMap
2554    -2   C_SYNC_TARGET set BitMap
2555  -100   after split brain, disconnect
2556 -1000   unrelated data
2557 -1091   requires proto 91
2558 -1096   requires proto 96
2559  */
2560 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2561 {
2562         u64 self, peer;
2563         int i, j;
2564
2565         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2566         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2567
2568         *rule_nr = 10;
2569         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2570                 return 0;
2571
2572         *rule_nr = 20;
2573         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2574              peer != UUID_JUST_CREATED)
2575                 return -2;
2576
2577         *rule_nr = 30;
2578         if (self != UUID_JUST_CREATED &&
2579             (peer == UUID_JUST_CREATED || peer == (u64)0))
2580                 return 2;
2581
2582         if (self == peer) {
2583                 int rct, dc; /* roles at crash time */
2584
2585                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2586
2587                         if (mdev->tconn->agreed_pro_version < 91)
2588                                 return -1091;
2589
2590                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2591                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2592                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2593                                 drbd_uuid_set_bm(mdev, 0UL);
2594
2595                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2596                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2597                                 *rule_nr = 34;
2598                         } else {
2599                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2600                                 *rule_nr = 36;
2601                         }
2602
2603                         return 1;
2604                 }
2605
2606                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2607
2608                         if (mdev->tconn->agreed_pro_version < 91)
2609                                 return -1091;
2610
2611                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2612                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2613                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2614
2615                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2616                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2617                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2618
2619                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2620                                 *rule_nr = 35;
2621                         } else {
2622                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2623                                 *rule_nr = 37;
2624                         }
2625
2626                         return -1;
2627                 }
2628
2629                 /* Common power [off|failure] */
2630                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2631                         (mdev->p_uuid[UI_FLAGS] & 2);
2632                 /* lowest bit is set when we were primary,
2633                  * next bit (weight 2) is set when peer was primary */
2634                 *rule_nr = 40;
2635
2636                 switch (rct) {
2637                 case 0: /* !self_pri && !peer_pri */ return 0;
2638                 case 1: /*  self_pri && !peer_pri */ return 1;
2639                 case 2: /* !self_pri &&  peer_pri */ return -1;
2640                 case 3: /*  self_pri &&  peer_pri */
2641                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2642                         return dc ? -1 : 1;
2643                 }
2644         }
2645
2646         *rule_nr = 50;
2647         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2648         if (self == peer)
2649                 return -1;
2650
2651         *rule_nr = 51;
2652         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2653         if (self == peer) {
2654                 if (mdev->tconn->agreed_pro_version < 96 ?
2655                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2656                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2657                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2658                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2659                            resync as sync source modifications of the peer's UUIDs. */
2660
2661                         if (mdev->tconn->agreed_pro_version < 91)
2662                                 return -1091;
2663
2664                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2665                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2666
2667                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2668                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2669
2670                         return -1;
2671                 }
2672         }
2673
2674         *rule_nr = 60;
2675         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2676         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2677                 peer = mdev->p_uuid[i] & ~((u64)1);
2678                 if (self == peer)
2679                         return -2;
2680         }
2681
2682         *rule_nr = 70;
2683         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2684         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2685         if (self == peer)
2686                 return 1;
2687
2688         *rule_nr = 71;
2689         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2690         if (self == peer) {
2691                 if (mdev->tconn->agreed_pro_version < 96 ?
2692                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2693                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2694                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2695                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2696                            resync as sync source modifications of our UUIDs. */
2697
2698                         if (mdev->tconn->agreed_pro_version < 91)
2699                                 return -1091;
2700
2701                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2702                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2703
2704                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2705                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2706                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2707
2708                         return 1;
2709                 }
2710         }
2711
2712
2713         *rule_nr = 80;
2714         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2715         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2716                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2717                 if (self == peer)
2718                         return 2;
2719         }
2720
2721         *rule_nr = 90;
2722         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2723         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2724         if (self == peer && self != ((u64)0))
2725                 return 100;
2726
2727         *rule_nr = 100;
2728         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2729                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2730                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2731                         peer = mdev->p_uuid[j] & ~((u64)1);
2732                         if (self == peer)
2733                                 return -100;
2734                 }
2735         }
2736
2737         return -1000;
2738 }
2739
2740 /* drbd_sync_handshake() returns the new conn state on success, or
2741    CONN_MASK (-1) on failure.
2742  */
2743 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2744                                            enum drbd_disk_state peer_disk) __must_hold(local)
2745 {
2746         int hg, rule_nr;
2747         enum drbd_conns rv = C_MASK;
2748         enum drbd_disk_state mydisk;
2749
2750         mydisk = mdev->state.disk;
2751         if (mydisk == D_NEGOTIATING)
2752                 mydisk = mdev->new_state_tmp.disk;
2753
2754         dev_info(DEV, "drbd_sync_handshake:\n");
2755         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2756         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2757                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2758
2759         hg = drbd_uuid_compare(mdev, &rule_nr);
2760
2761         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2762
2763         if (hg == -1000) {
2764                 dev_alert(DEV, "Unrelated data, aborting!\n");
2765                 return C_MASK;
2766         }
2767         if (hg < -1000) {
2768                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2769                 return C_MASK;
2770         }
2771
2772         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2773             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2774                 int f = (hg == -100) || abs(hg) == 2;
2775                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2776                 if (f)
2777                         hg = hg*2;
2778                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2779                      hg > 0 ? "source" : "target");
2780         }
2781
2782         if (abs(hg) == 100)
2783                 drbd_khelper(mdev, "initial-split-brain");
2784
2785         if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
2786                 int pcount = (mdev->state.role == R_PRIMARY)
2787                            + (peer_role == R_PRIMARY);
2788                 int forced = (hg == -100);
2789
2790                 switch (pcount) {
2791                 case 0:
2792                         hg = drbd_asb_recover_0p(mdev);
2793                         break;
2794                 case 1:
2795                         hg = drbd_asb_recover_1p(mdev);
2796                         break;
2797                 case 2:
2798                         hg = drbd_asb_recover_2p(mdev);
2799                         break;
2800                 }
2801                 if (abs(hg) < 100) {
2802                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2803                              "automatically solved. Sync from %s node\n",
2804                              pcount, (hg < 0) ? "peer" : "this");
2805                         if (forced) {
2806                                 dev_warn(DEV, "Doing a full sync, since"
2807                                      " UUIDs where ambiguous.\n");
2808                                 hg = hg*2;
2809                         }
2810                 }
2811         }
2812
2813         if (hg == -100) {
2814                 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2815                         hg = -1;
2816                 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2817                         hg = 1;
2818
2819                 if (abs(hg) < 100)
2820                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2821                              "Sync from %s node\n",
2822                              (hg < 0) ? "peer" : "this");
2823         }
2824
2825         if (hg == -100) {
2826                 /* FIXME this log message is not correct if we end up here
2827                  * after an attempted attach on a diskless node.
2828                  * We just refuse to attach -- well, we drop the "connection"
2829                  * to that disk, in a way... */
2830                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2831                 drbd_khelper(mdev, "split-brain");
2832                 return C_MASK;
2833         }
2834
2835         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2836                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2837                 return C_MASK;
2838         }
2839
2840         if (hg < 0 && /* by intention we do not use mydisk here. */
2841             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2842                 switch (mdev->tconn->net_conf->rr_conflict) {
2843                 case ASB_CALL_HELPER:
2844                         drbd_khelper(mdev, "pri-lost");
2845                         /* fall through */
2846                 case ASB_DISCONNECT:
2847                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2848                         return C_MASK;
2849                 case ASB_VIOLENTLY:
2850                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2851                              "assumption\n");
2852                 }
2853         }
2854
2855         if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2856                 if (hg == 0)
2857                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2858                 else
2859                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2860                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2861                                  abs(hg) >= 2 ? "full" : "bit-map based");
2862                 return C_MASK;
2863         }
2864
2865         if (abs(hg) >= 2) {
2866                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2867                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2868                                         BM_LOCKED_SET_ALLOWED))
2869                         return C_MASK;
2870         }
2871
2872         if (hg > 0) { /* become sync source. */
2873                 rv = C_WF_BITMAP_S;
2874         } else if (hg < 0) { /* become sync target */
2875                 rv = C_WF_BITMAP_T;
2876         } else {
2877                 rv = C_CONNECTED;
2878                 if (drbd_bm_total_weight(mdev)) {
2879                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2880                              drbd_bm_total_weight(mdev));
2881                 }
2882         }
2883
2884         return rv;
2885 }
2886
2887 /* returns 1 if invalid */
2888 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2889 {
2890         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2891         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2892             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2893                 return 0;
2894
2895         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2896         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2897             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2898                 return 1;
2899
2900         /* everything else is valid if they are equal on both sides. */
2901         if (peer == self)
2902                 return 0;
2903
2904         /* everything es is invalid. */
2905         return 1;
2906 }
2907
2908 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2909 {
2910         struct p_protocol *p = pi->data;
2911         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2912         int p_want_lose, p_two_primaries, cf;
2913         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2914
2915         p_proto         = be32_to_cpu(p->protocol);
2916         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2917         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2918         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2919         p_two_primaries = be32_to_cpu(p->two_primaries);
2920         cf              = be32_to_cpu(p->conn_flags);
2921         p_want_lose = cf & CF_WANT_LOSE;
2922
2923         clear_bit(CONN_DRY_RUN, &tconn->flags);
2924
2925         if (cf & CF_DRY_RUN)
2926                 set_bit(CONN_DRY_RUN, &tconn->flags);
2927
2928         if (p_proto != tconn->net_conf->wire_protocol) {
2929                 conn_err(tconn, "incompatible communication protocols\n");
2930                 goto disconnect;
2931         }
2932
2933         if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2934                 conn_err(tconn, "incompatible after-sb-0pri settings\n");
2935                 goto disconnect;
2936         }
2937
2938         if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2939                 conn_err(tconn, "incompatible after-sb-1pri settings\n");
2940                 goto disconnect;
2941         }
2942
2943         if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2944                 conn_err(tconn, "incompatible after-sb-2pri settings\n");
2945                 goto disconnect;
2946         }
2947
2948         if (p_want_lose && tconn->net_conf->want_lose) {
2949                 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
2950                 goto disconnect;
2951         }
2952
2953         if (p_two_primaries != tconn->net_conf->two_primaries) {
2954                 conn_err(tconn, "incompatible setting of the two-primaries options\n");
2955                 goto disconnect;
2956         }
2957
2958         if (tconn->agreed_pro_version >= 87) {
2959                 unsigned char *my_alg = tconn->net_conf->integrity_alg;
2960                 int err;
2961
2962                 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
2963                 if (err)
2964                         return err;
2965
2966                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2967                 if (strcmp(p_integrity_alg, my_alg)) {
2968                         conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
2969                         goto disconnect;
2970                 }
2971                 conn_info(tconn, "data-integrity-alg: %s\n",
2972                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2973         }
2974
2975         return 0;
2976
2977 disconnect:
2978         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
2979         return -EIO;
2980 }
2981
2982 /* helper function
2983  * input: alg name, feature name
2984  * return: NULL (alg name was "")
2985  *         ERR_PTR(error) if something goes wrong
2986  *         or the crypto hash ptr, if it worked out ok. */
2987 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2988                 const char *alg, const char *name)
2989 {
2990         struct crypto_hash *tfm;
2991
2992         if (!alg[0])
2993                 return NULL;
2994
2995         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2996         if (IS_ERR(tfm)) {
2997                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2998                         alg, name, PTR_ERR(tfm));
2999                 return tfm;
3000         }
3001         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
3002                 crypto_free_hash(tfm);
3003                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
3004                 return ERR_PTR(-EINVAL);
3005         }
3006         return tfm;
3007 }
3008
3009 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3010 {
3011         void *buffer = tconn->data.rbuf;
3012         int size = pi->size;
3013
3014         while (size) {
3015                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3016                 s = drbd_recv(tconn, buffer, s);
3017                 if (s <= 0) {
3018                         if (s < 0)
3019                                 return s;
3020                         break;
3021                 }
3022                 size -= s;
3023         }
3024         if (size)
3025                 return -EIO;
3026         return 0;
3027 }
3028
3029 /*
3030  * config_unknown_volume  -  device configuration command for unknown volume
3031  *
3032  * When a device is added to an existing connection, the node on which the
3033  * device is added first will send configuration commands to its peer but the
3034  * peer will not know about the device yet.  It will warn and ignore these
3035  * commands.  Once the device is added on the second node, the second node will
3036  * send the same device configuration commands, but in the other direction.
3037  *
3038  * (We can also end up here if drbd is misconfigured.)
3039  */
3040 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3041 {
3042         conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3043                   pi->vnr, cmdname(pi->cmd));
3044         return ignore_remaining_packet(tconn, pi);
3045 }
3046
3047 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3048 {
3049         struct drbd_conf *mdev;
3050         struct p_rs_param_95 *p;
3051         unsigned int header_size, data_size, exp_max_sz;
3052         struct crypto_hash *verify_tfm = NULL;
3053         struct crypto_hash *csums_tfm = NULL;
3054         const int apv = tconn->agreed_pro_version;
3055         int *rs_plan_s = NULL;
3056         int fifo_size = 0;
3057         int err;
3058
3059         mdev = vnr_to_mdev(tconn, pi->vnr);
3060         if (!mdev)
3061                 return config_unknown_volume(tconn, pi);
3062
3063         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3064                     : apv == 88 ? sizeof(struct p_rs_param)
3065                                         + SHARED_SECRET_MAX
3066                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3067                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3068
3069         if (pi->size > exp_max_sz) {
3070                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3071                     pi->size, exp_max_sz);
3072                 return -EIO;
3073         }
3074
3075         if (apv <= 88) {
3076                 header_size = sizeof(struct p_rs_param);
3077                 data_size = pi->size - header_size;
3078         } else if (apv <= 94) {
3079                 header_size = sizeof(struct p_rs_param_89);
3080                 data_size = pi->size - header_size;
3081                 D_ASSERT(data_size == 0);
3082         } else {
3083                 header_size = sizeof(struct p_rs_param_95);
3084                 data_size = pi->size - header_size;
3085                 D_ASSERT(data_size == 0);
3086         }
3087
3088         /* initialize verify_alg and csums_alg */
3089         p = pi->data;
3090         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3091
3092         err = drbd_recv_all(mdev->tconn, p, header_size);
3093         if (err)
3094                 return err;
3095
3096         if (get_ldev(mdev)) {
3097                 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3098                 put_ldev(mdev);
3099         }
3100
3101         if (apv >= 88) {
3102                 if (apv == 88) {
3103                         if (data_size > SHARED_SECRET_MAX) {
3104                                 dev_err(DEV, "verify-alg too long, "
3105                                     "peer wants %u, accepting only %u byte\n",
3106                                                 data_size, SHARED_SECRET_MAX);
3107                                 return -EIO;
3108                         }
3109
3110                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3111                         if (err)
3112                                 return err;
3113
3114                         /* we expect NUL terminated string */
3115                         /* but just in case someone tries to be evil */
3116                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3117                         p->verify_alg[data_size-1] = 0;
3118
3119                 } else /* apv >= 89 */ {
3120                         /* we still expect NUL terminated strings */
3121                         /* but just in case someone tries to be evil */
3122                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3123                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3124                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3125                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3126                 }
3127
3128                 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
3129                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3130                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3131                                     mdev->tconn->net_conf->verify_alg, p->verify_alg);
3132                                 goto disconnect;
3133                         }
3134                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3135                                         p->verify_alg, "verify-alg");
3136                         if (IS_ERR(verify_tfm)) {
3137                                 verify_tfm = NULL;
3138                                 goto disconnect;
3139                         }
3140                 }
3141
3142                 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
3143                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3144                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3145                                     mdev->tconn->net_conf->csums_alg, p->csums_alg);
3146                                 goto disconnect;
3147                         }
3148                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3149                                         p->csums_alg, "csums-alg");
3150                         if (IS_ERR(csums_tfm)) {
3151                                 csums_tfm = NULL;
3152                                 goto disconnect;
3153                         }
3154                 }
3155
3156                 if (apv > 94 && get_ldev(mdev)) {
3157                         mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3158                         mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3159                         mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3160                         mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3161                         mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3162
3163                         fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3164                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3165                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3166                                 if (!rs_plan_s) {
3167                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3168                                         put_ldev(mdev);
3169                                         goto disconnect;
3170                                 }
3171                         }
3172                         put_ldev(mdev);
3173                 }
3174
3175                 spin_lock(&mdev->peer_seq_lock);
3176                 /* lock against drbd_nl_syncer_conf() */
3177                 if (verify_tfm) {
3178                         strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3179                         mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3180                         crypto_free_hash(mdev->tconn->verify_tfm);
3181                         mdev->tconn->verify_tfm = verify_tfm;
3182                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3183                 }
3184                 if (csums_tfm) {
3185                         strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3186                         mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3187                         crypto_free_hash(mdev->tconn->csums_tfm);
3188                         mdev->tconn->csums_tfm = csums_tfm;
3189                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3190                 }
3191                 if (fifo_size != mdev->rs_plan_s.size) {
3192                         kfree(mdev->rs_plan_s.values);
3193                         mdev->rs_plan_s.values = rs_plan_s;
3194                         mdev->rs_plan_s.size   = fifo_size;
3195                         mdev->rs_planed = 0;
3196                 }
3197                 spin_unlock(&mdev->peer_seq_lock);
3198         }
3199         return 0;
3200
3201 disconnect:
3202         /* just for completeness: actually not needed,
3203          * as this is not reached if csums_tfm was ok. */
3204         crypto_free_hash(csums_tfm);
3205         /* but free the verify_tfm again, if csums_tfm did not work out */
3206         crypto_free_hash(verify_tfm);
3207         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3208         return -EIO;
3209 }
3210
3211 /* warn if the arguments differ by more than 12.5% */
3212 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3213         const char *s, sector_t a, sector_t b)
3214 {
3215         sector_t d;
3216         if (a == 0 || b == 0)
3217                 return;
3218         d = (a > b) ? (a - b) : (b - a);
3219         if (d > (a>>3) || d > (b>>3))
3220                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3221                      (unsigned long long)a, (unsigned long long)b);
3222 }
3223
3224 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3225 {
3226         struct drbd_conf *mdev;
3227         struct p_sizes *p = pi->data;
3228         enum determine_dev_size dd = unchanged;
3229         sector_t p_size, p_usize, my_usize;
3230         int ldsc = 0; /* local disk size changed */
3231         enum dds_flags ddsf;
3232
3233         mdev = vnr_to_mdev(tconn, pi->vnr);
3234         if (!mdev)
3235                 return config_unknown_volume(tconn, pi);
3236
3237         p_size = be64_to_cpu(p->d_size);
3238         p_usize = be64_to_cpu(p->u_size);
3239
3240         /* just store the peer's disk size for now.
3241          * we still need to figure out whether we accept that. */
3242         mdev->p_size = p_size;
3243
3244         if (get_ldev(mdev)) {
3245                 warn_if_differ_considerably(mdev, "lower level device sizes",
3246                            p_size, drbd_get_max_capacity(mdev->ldev));
3247                 warn_if_differ_considerably(mdev, "user requested size",
3248                                             p_usize, mdev->ldev->dc.disk_size);
3249
3250                 /* if this is the first connect, or an otherwise expected
3251                  * param exchange, choose the minimum */
3252                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3253                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3254                                              p_usize);
3255
3256                 my_usize = mdev->ldev->dc.disk_size;
3257
3258                 if (mdev->ldev->dc.disk_size != p_usize) {
3259                         mdev->ldev->dc.disk_size = p_usize;
3260                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3261                              (unsigned long)mdev->ldev->dc.disk_size);
3262                 }
3263
3264                 /* Never shrink a device with usable data during connect.
3265                    But allow online shrinking if we are connected. */
3266                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3267                    drbd_get_capacity(mdev->this_bdev) &&
3268                    mdev->state.disk >= D_OUTDATED &&
3269                    mdev->state.conn < C_CONNECTED) {
3270                         dev_err(DEV, "The peer's disk size is too small!\n");
3271                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3272                         mdev->ldev->dc.disk_size = my_usize;
3273                         put_ldev(mdev);
3274                         return -EIO;
3275                 }
3276                 put_ldev(mdev);
3277         }
3278
3279         ddsf = be16_to_cpu(p->dds_flags);
3280         if (get_ldev(mdev)) {
3281                 dd = drbd_determine_dev_size(mdev, ddsf);
3282                 put_ldev(mdev);
3283                 if (dd == dev_size_error)
3284                         return -EIO;
3285                 drbd_md_sync(mdev);
3286         } else {
3287                 /* I am diskless, need to accept the peer's size. */
3288                 drbd_set_my_capacity(mdev, p_size);
3289         }
3290
3291         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3292         drbd_reconsider_max_bio_size(mdev);
3293
3294         if (get_ldev(mdev)) {
3295                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3296                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3297                         ldsc = 1;
3298                 }
3299
3300                 put_ldev(mdev);
3301         }
3302
3303         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3304                 if (be64_to_cpu(p->c_size) !=
3305                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3306                         /* we have different sizes, probably peer
3307                          * needs to know my new size... */
3308                         drbd_send_sizes(mdev, 0, ddsf);
3309                 }
3310                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3311                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3312                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3313                             mdev->state.disk >= D_INCONSISTENT) {
3314                                 if (ddsf & DDSF_NO_RESYNC)
3315                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3316                                 else
3317                                         resync_after_online_grow(mdev);
3318                         } else
3319                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3320                 }
3321         }
3322
3323         return 0;
3324 }
3325
3326 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3327 {
3328         struct drbd_conf *mdev;
3329         struct p_uuids *p = pi->data;
3330         u64 *p_uuid;
3331         int i, updated_uuids = 0;
3332
3333         mdev = vnr_to_mdev(tconn, pi->vnr);
3334         if (!mdev)
3335                 return config_unknown_volume(tconn, pi);
3336
3337         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3338
3339         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3340                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3341
3342         kfree(mdev->p_uuid);
3343         mdev->p_uuid = p_uuid;
3344
3345         if (mdev->state.conn < C_CONNECTED &&
3346             mdev->state.disk < D_INCONSISTENT &&
3347             mdev->state.role == R_PRIMARY &&
3348             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3349                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3350                     (unsigned long long)mdev->ed_uuid);
3351                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3352                 return -EIO;
3353         }
3354
3355         if (get_ldev(mdev)) {
3356                 int skip_initial_sync =
3357                         mdev->state.conn == C_CONNECTED &&
3358                         mdev->tconn->agreed_pro_version >= 90 &&
3359                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3360                         (p_uuid[UI_FLAGS] & 8);
3361                 if (skip_initial_sync) {
3362                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3363                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3364                                         "clear_n_write from receive_uuids",
3365                                         BM_LOCKED_TEST_ALLOWED);
3366                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3367                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3368                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3369                                         CS_VERBOSE, NULL);
3370                         drbd_md_sync(mdev);
3371                         updated_uuids = 1;
3372                 }
3373                 put_ldev(mdev);
3374         } else if (mdev->state.disk < D_INCONSISTENT &&
3375                    mdev->state.role == R_PRIMARY) {
3376                 /* I am a diskless primary, the peer just created a new current UUID
3377                    for me. */
3378                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3379         }
3380
3381         /* Before we test for the disk state, we should wait until an eventually
3382            ongoing cluster wide state change is finished. That is important if
3383            we are primary and are detaching from our disk. We need to see the
3384            new disk state... */
3385         mutex_lock(mdev->state_mutex);
3386         mutex_unlock(mdev->state_mutex);
3387         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3388                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3389
3390         if (updated_uuids)
3391                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3392
3393         return 0;
3394 }
3395
3396 /**
3397  * convert_state() - Converts the peer's view of the cluster state to our point of view
3398  * @ps:         The state as seen by the peer.
3399  */
3400 static union drbd_state convert_state(union drbd_state ps)
3401 {
3402         union drbd_state ms;
3403
3404         static enum drbd_conns c_tab[] = {
3405                 [C_CONNECTED] = C_CONNECTED,
3406
3407                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3408                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3409                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3410                 [C_VERIFY_S]       = C_VERIFY_T,
3411                 [C_MASK]   = C_MASK,
3412         };
3413
3414         ms.i = ps.i;
3415
3416         ms.conn = c_tab[ps.conn];
3417         ms.peer = ps.role;
3418         ms.role = ps.peer;
3419         ms.pdsk = ps.disk;
3420         ms.disk = ps.pdsk;
3421         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3422
3423         return ms;
3424 }
3425
3426 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3427 {
3428         struct drbd_conf *mdev;
3429         struct p_req_state *p = pi->data;
3430         union drbd_state mask, val;
3431         enum drbd_state_rv rv;
3432
3433         mdev = vnr_to_mdev(tconn, pi->vnr);
3434         if (!mdev)
3435                 return -EIO;
3436
3437         mask.i = be32_to_cpu(p->mask);
3438         val.i = be32_to_cpu(p->val);
3439
3440         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3441             mutex_is_locked(mdev->state_mutex)) {
3442                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3443                 return 0;
3444         }
3445
3446         mask = convert_state(mask);
3447         val = convert_state(val);
3448
3449         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3450         drbd_send_sr_reply(mdev, rv);
3451
3452         drbd_md_sync(mdev);
3453
3454         return 0;
3455 }
3456
3457 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3458 {
3459         struct p_req_state *p = pi->data;
3460         union drbd_state mask, val;
3461         enum drbd_state_rv rv;
3462
3463         mask.i = be32_to_cpu(p->mask);
3464         val.i = be32_to_cpu(p->val);
3465
3466         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3467             mutex_is_locked(&tconn->cstate_mutex)) {
3468                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3469                 return 0;
3470         }
3471
3472         mask = convert_state(mask);
3473         val = convert_state(val);
3474
3475         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3476         conn_send_sr_reply(tconn, rv);
3477
3478         return 0;
3479 }
3480
3481 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3482 {
3483         struct drbd_conf *mdev;
3484         struct p_state *p = pi->data;
3485         union drbd_state os, ns, peer_state;
3486         enum drbd_disk_state real_peer_disk;
3487         enum chg_state_flags cs_flags;
3488         int rv;
3489
3490         mdev = vnr_to_mdev(tconn, pi->vnr);
3491         if (!mdev)
3492                 return config_unknown_volume(tconn, pi);
3493
3494         peer_state.i = be32_to_cpu(p->state);
3495
3496         real_peer_disk = peer_state.disk;
3497         if (peer_state.disk == D_NEGOTIATING) {
3498                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3499                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3500         }
3501
3502         spin_lock_irq(&mdev->tconn->req_lock);
3503  retry:
3504         os = ns = drbd_read_state(mdev);
3505         spin_unlock_irq(&mdev->tconn->req_lock);
3506
3507         /* peer says his disk is uptodate, while we think it is inconsistent,
3508          * and this happens while we think we have a sync going on. */
3509         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3510             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3511                 /* If we are (becoming) SyncSource, but peer is still in sync
3512                  * preparation, ignore its uptodate-ness to avoid flapping, it
3513                  * will change to inconsistent once the peer reaches active
3514                  * syncing states.
3515                  * It may have changed syncer-paused flags, however, so we
3516                  * cannot ignore this completely. */
3517                 if (peer_state.conn > C_CONNECTED &&
3518                     peer_state.conn < C_SYNC_SOURCE)
3519                         real_peer_disk = D_INCONSISTENT;
3520
3521                 /* if peer_state changes to connected at the same time,
3522                  * it explicitly notifies us that it finished resync.
3523                  * Maybe we should finish it up, too? */
3524                 else if (os.conn >= C_SYNC_SOURCE &&
3525                          peer_state.conn == C_CONNECTED) {
3526                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3527                                 drbd_resync_finished(mdev);
3528                         return 0;
3529                 }
3530         }
3531
3532         /* peer says his disk is inconsistent, while we think it is uptodate,
3533          * and this happens while the peer still thinks we have a sync going on,
3534          * but we think we are already done with the sync.
3535          * We ignore this to avoid flapping pdsk.
3536          * This should not happen, if the peer is a recent version of drbd. */
3537         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3538             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3539                 real_peer_disk = D_UP_TO_DATE;
3540
3541         if (ns.conn == C_WF_REPORT_PARAMS)
3542                 ns.conn = C_CONNECTED;
3543
3544         if (peer_state.conn == C_AHEAD)
3545                 ns.conn = C_BEHIND;
3546
3547         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3548             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3549                 int cr; /* consider resync */
3550
3551                 /* if we established a new connection */
3552                 cr  = (os.conn < C_CONNECTED);
3553                 /* if we had an established connection
3554                  * and one of the nodes newly attaches a disk */
3555                 cr |= (os.conn == C_CONNECTED &&
3556                        (peer_state.disk == D_NEGOTIATING ||
3557                         os.disk == D_NEGOTIATING));
3558                 /* if we have both been inconsistent, and the peer has been
3559                  * forced to be UpToDate with --overwrite-data */
3560                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3561                 /* if we had been plain connected, and the admin requested to
3562                  * start a sync by "invalidate" or "invalidate-remote" */
3563                 cr |= (os.conn == C_CONNECTED &&
3564                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3565                                  peer_state.conn <= C_WF_BITMAP_T));
3566
3567                 if (cr)
3568                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3569
3570                 put_ldev(mdev);
3571                 if (ns.conn == C_MASK) {
3572                         ns.conn = C_CONNECTED;
3573                         if (mdev->state.disk == D_NEGOTIATING) {
3574                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3575                         } else if (peer_state.disk == D_NEGOTIATING) {
3576                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3577                                 peer_state.disk = D_DISKLESS;
3578                                 real_peer_disk = D_DISKLESS;
3579                         } else {
3580                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3581                                         return -EIO;
3582                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3583                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3584                                 return -EIO;
3585                         }
3586                 }
3587         }
3588
3589         spin_lock_irq(&mdev->tconn->req_lock);
3590         if (os.i != drbd_read_state(mdev).i)
3591                 goto retry;
3592         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3593         ns.peer = peer_state.role;
3594         ns.pdsk = real_peer_disk;
3595         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3596         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3597                 ns.disk = mdev->new_state_tmp.disk;
3598         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3599         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3600             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3601                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3602                    for temporal network outages! */
3603                 spin_unlock_irq(&mdev->tconn->req_lock);
3604                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3605                 tl_clear(mdev->tconn);
3606                 drbd_uuid_new_current(mdev);
3607                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3608                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3609                 return -EIO;
3610         }
3611         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3612         ns = drbd_read_state(mdev);
3613         spin_unlock_irq(&mdev->tconn->req_lock);
3614
3615         if (rv < SS_SUCCESS) {
3616                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3617                 return -EIO;
3618         }
3619
3620         if (os.conn > C_WF_REPORT_PARAMS) {
3621                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3622                     peer_state.disk != D_NEGOTIATING ) {
3623                         /* we want resync, peer has not yet decided to sync... */
3624                         /* Nowadays only used when forcing a node into primary role and
3625                            setting its disk to UpToDate with that */
3626                         drbd_send_uuids(mdev);
3627                         drbd_send_state(mdev);
3628                 }
3629         }
3630
3631         mdev->tconn->net_conf->want_lose = 0;
3632
3633         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3634
3635         return 0;
3636 }
3637
3638 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3639 {
3640         struct drbd_conf *mdev;
3641         struct p_rs_uuid *p = pi->data;
3642
3643         mdev = vnr_to_mdev(tconn, pi->vnr);
3644         if (!mdev)
3645                 return -EIO;
3646
3647         wait_event(mdev->misc_wait,
3648                    mdev->state.conn == C_WF_SYNC_UUID ||
3649                    mdev->state.conn == C_BEHIND ||
3650                    mdev->state.conn < C_CONNECTED ||
3651                    mdev->state.disk < D_NEGOTIATING);
3652
3653         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3654
3655         /* Here the _drbd_uuid_ functions are right, current should
3656            _not_ be rotated into the history */
3657         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3658                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3659                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3660
3661                 drbd_print_uuids(mdev, "updated sync uuid");
3662                 drbd_start_resync(mdev, C_SYNC_TARGET);
3663
3664                 put_ldev(mdev);
3665         } else
3666                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3667
3668         return 0;
3669 }
3670
3671 /**
3672  * receive_bitmap_plain
3673  *
3674  * Return 0 when done, 1 when another iteration is needed, and a negative error
3675  * code upon failure.
3676  */
3677 static int
3678 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3679                      unsigned long *p, struct bm_xfer_ctx *c)
3680 {
3681         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3682                                  drbd_header_size(mdev->tconn);
3683         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3684                                        c->bm_words - c->word_offset);
3685         unsigned int want = num_words * sizeof(*p);
3686         int err;
3687
3688         if (want != size) {
3689                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3690                 return -EIO;
3691         }
3692         if (want == 0)
3693                 return 0;
3694         err = drbd_recv_all(mdev->tconn, p, want);
3695         if (err)
3696                 return err;
3697
3698         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3699
3700         c->word_offset += num_words;
3701         c->bit_offset = c->word_offset * BITS_PER_LONG;
3702         if (c->bit_offset > c->bm_bits)
3703                 c->bit_offset = c->bm_bits;
3704
3705         return 1;
3706 }
3707
3708 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3709 {
3710         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3711 }
3712
3713 static int dcbp_get_start(struct p_compressed_bm *p)
3714 {
3715         return (p->encoding & 0x80) != 0;
3716 }
3717
3718 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3719 {
3720         return (p->encoding >> 4) & 0x7;
3721 }
3722
3723 /**
3724  * recv_bm_rle_bits
3725  *
3726  * Return 0 when done, 1 when another iteration is needed, and a negative error
3727  * code upon failure.
3728  */
3729 static int
3730 recv_bm_rle_bits(struct drbd_conf *mdev,
3731                 struct p_compressed_bm *p,
3732                  struct bm_xfer_ctx *c,
3733                  unsigned int len)
3734 {
3735         struct bitstream bs;
3736         u64 look_ahead;
3737         u64 rl;
3738         u64 tmp;
3739         unsigned long s = c->bit_offset;
3740         unsigned long e;
3741         int toggle = dcbp_get_start(p);
3742         int have;
3743         int bits;
3744
3745         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3746
3747         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3748         if (bits < 0)
3749                 return -EIO;
3750
3751         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3752                 bits = vli_decode_bits(&rl, look_ahead);
3753                 if (bits <= 0)
3754                         return -EIO;
3755
3756                 if (toggle) {
3757                         e = s + rl -1;
3758                         if (e >= c->bm_bits) {
3759                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3760                                 return -EIO;
3761                         }
3762                         _drbd_bm_set_bits(mdev, s, e);
3763                 }
3764
3765                 if (have < bits) {
3766                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3767                                 have, bits, look_ahead,
3768                                 (unsigned int)(bs.cur.b - p->code),
3769                                 (unsigned int)bs.buf_len);
3770                         return -EIO;
3771                 }
3772                 look_ahead >>= bits;
3773                 have -= bits;
3774
3775                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3776                 if (bits < 0)
3777                         return -EIO;
3778                 look_ahead |= tmp << have;
3779                 have += bits;
3780         }
3781
3782         c->bit_offset = s;
3783         bm_xfer_ctx_bit_to_word_offset(c);
3784
3785         return (s != c->bm_bits);
3786 }
3787
3788 /**
3789  * decode_bitmap_c
3790  *
3791  * Return 0 when done, 1 when another iteration is needed, and a negative error
3792  * code upon failure.
3793  */
3794 static int
3795 decode_bitmap_c(struct drbd_conf *mdev,
3796                 struct p_compressed_bm *p,
3797                 struct bm_xfer_ctx *c,
3798                 unsigned int len)
3799 {
3800         if (dcbp_get_code(p) == RLE_VLI_Bits)
3801                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
3802
3803         /* other variants had been implemented for evaluation,
3804          * but have been dropped as this one turned out to be "best"
3805          * during all our tests. */
3806
3807         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3808         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3809         return -EIO;
3810 }
3811
3812 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3813                 const char *direction, struct bm_xfer_ctx *c)
3814 {
3815         /* what would it take to transfer it "plaintext" */
3816         unsigned int header_size = drbd_header_size(mdev->tconn);
3817         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3818         unsigned int plain =
3819                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3820                 c->bm_words * sizeof(unsigned long);
3821         unsigned int total = c->bytes[0] + c->bytes[1];
3822         unsigned int r;
3823
3824         /* total can not be zero. but just in case: */
3825         if (total == 0)
3826                 return;
3827
3828         /* don't report if not compressed */
3829         if (total >= plain)
3830                 return;
3831
3832         /* total < plain. check for overflow, still */
3833         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3834                                     : (1000 * total / plain);
3835
3836         if (r > 1000)
3837                 r = 1000;
3838
3839         r = 1000 - r;
3840         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3841              "total %u; compression: %u.%u%%\n",
3842                         direction,
3843                         c->bytes[1], c->packets[1],
3844                         c->bytes[0], c->packets[0],
3845                         total, r/10, r % 10);
3846 }
3847
3848 /* Since we are processing the bitfield from lower addresses to higher,
3849    it does not matter if the process it in 32 bit chunks or 64 bit
3850    chunks as long as it is little endian. (Understand it as byte stream,
3851    beginning with the lowest byte...) If we would use big endian
3852    we would need to process it from the highest address to the lowest,
3853    in order to be agnostic to the 32 vs 64 bits issue.
3854
3855    returns 0 on failure, 1 if we successfully received it. */
3856 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3857 {
3858         struct drbd_conf *mdev;
3859         struct bm_xfer_ctx c;
3860         int err;
3861
3862         mdev = vnr_to_mdev(tconn, pi->vnr);
3863         if (!mdev)
3864                 return -EIO;
3865
3866         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3867         /* you are supposed to send additional out-of-sync information
3868          * if you actually set bits during this phase */
3869
3870         c = (struct bm_xfer_ctx) {
3871                 .bm_bits = drbd_bm_bits(mdev),
3872                 .bm_words = drbd_bm_words(mdev),
3873         };
3874
3875         for(;;) {
3876                 if (pi->cmd == P_BITMAP)
3877                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
3878                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
3879                         /* MAYBE: sanity check that we speak proto >= 90,
3880                          * and the feature is enabled! */
3881                         struct p_compressed_bm *p = pi->data;
3882
3883                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
3884                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3885                                 err = -EIO;
3886                                 goto out;
3887                         }
3888                         if (pi->size <= sizeof(*p)) {
3889                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
3890                                 err = -EIO;
3891                                 goto out;
3892                         }
3893                         err = drbd_recv_all(mdev->tconn, p, pi->size);
3894                         if (err)
3895                                goto out;
3896                         err = decode_bitmap_c(mdev, p, &c, pi->size);
3897                 } else {
3898                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
3899                         err = -EIO;
3900                         goto out;
3901                 }
3902
3903                 c.packets[pi->cmd == P_BITMAP]++;
3904                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
3905
3906                 if (err <= 0) {
3907                         if (err < 0)
3908                                 goto out;
3909                         break;
3910                 }
3911                 err = drbd_recv_header(mdev->tconn, pi);
3912                 if (err)
3913                         goto out;
3914         }
3915
3916         INFO_bm_xfer_stats(mdev, "receive", &c);
3917
3918         if (mdev->state.conn == C_WF_BITMAP_T) {
3919                 enum drbd_state_rv rv;
3920
3921                 err = drbd_send_bitmap(mdev);
3922                 if (err)
3923                         goto out;
3924                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3925                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3926                 D_ASSERT(rv == SS_SUCCESS);
3927         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3928                 /* admin may have requested C_DISCONNECTING,
3929                  * other threads may have noticed network errors */
3930                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3931                     drbd_conn_str(mdev->state.conn));
3932         }
3933         err = 0;
3934
3935  out:
3936         drbd_bm_unlock(mdev);
3937         if (!err && mdev->state.conn == C_WF_BITMAP_S)
3938                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3939         return err;
3940 }
3941
3942 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
3943 {
3944         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
3945                  pi->cmd, pi->size);
3946
3947         return ignore_remaining_packet(tconn, pi);
3948 }
3949
3950 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
3951 {
3952         /* Make sure we've acked all the TCP data associated
3953          * with the data requests being unplugged */
3954         drbd_tcp_quickack(tconn->data.socket);
3955
3956         return 0;
3957 }
3958
3959 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
3960 {
3961         struct drbd_conf *mdev;
3962         struct p_block_desc *p = pi->data;
3963
3964         mdev = vnr_to_mdev(tconn, pi->vnr);
3965         if (!mdev)
3966                 return -EIO;
3967
3968         switch (mdev->state.conn) {
3969         case C_WF_SYNC_UUID:
3970         case C_WF_BITMAP_T:
3971         case C_BEHIND:
3972                         break;
3973         default:
3974                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3975                                 drbd_conn_str(mdev->state.conn));
3976         }
3977
3978         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3979
3980         return 0;
3981 }
3982
3983 struct data_cmd {
3984         int expect_payload;
3985         size_t pkt_size;
3986         int (*fn)(struct drbd_tconn *, struct packet_info *);
3987 };
3988
3989 static struct data_cmd drbd_cmd_handler[] = {
3990         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
3991         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
3992         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3993         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3994         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
3995         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
3996         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
3997         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3998         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3999         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4000         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4001         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4002         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4003         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4004         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4005         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4006         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4007         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4008         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4009         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4010         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4011         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4012         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4013 };
4014
4015 static void drbdd(struct drbd_tconn *tconn)
4016 {
4017         struct packet_info pi;
4018         size_t shs; /* sub header size */
4019         int err;
4020
4021         while (get_t_state(&tconn->receiver) == RUNNING) {
4022                 struct data_cmd *cmd;
4023
4024                 drbd_thread_current_set_cpu(&tconn->receiver);
4025                 if (drbd_recv_header(tconn, &pi))
4026                         goto err_out;
4027
4028                 cmd = &drbd_cmd_handler[pi.cmd];
4029                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4030                         conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4031                         goto err_out;
4032                 }
4033
4034                 shs = cmd->pkt_size;
4035                 if (pi.size > shs && !cmd->expect_payload) {
4036                         conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4037                         goto err_out;
4038                 }
4039
4040                 if (shs) {
4041                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4042                         if (err)
4043                                 goto err_out;
4044                         pi.size -= shs;
4045                 }
4046
4047                 err = cmd->fn(tconn, &pi);
4048                 if (err) {
4049                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4050                                  cmdname(pi.cmd), err, pi.size);
4051                         goto err_out;
4052                 }
4053         }
4054         return;
4055
4056     err_out:
4057         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4058 }
4059
4060 void conn_flush_workqueue(struct drbd_tconn *tconn)
4061 {
4062         struct drbd_wq_barrier barr;
4063
4064         barr.w.cb = w_prev_work_done;
4065         barr.w.tconn = tconn;
4066         init_completion(&barr.done);
4067         drbd_queue_work(&tconn->data.work, &barr.w);
4068         wait_for_completion(&barr.done);
4069 }
4070
4071 static void drbd_disconnect(struct drbd_tconn *tconn)
4072 {
4073         enum drbd_conns oc;
4074         int rv = SS_UNKNOWN_ERROR;
4075
4076         if (tconn->cstate == C_STANDALONE)
4077                 return;
4078
4079         /* asender does not clean up anything. it must not interfere, either */
4080         drbd_thread_stop(&tconn->asender);
4081         drbd_free_sock(tconn);
4082
4083         idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4084         conn_info(tconn, "Connection closed\n");
4085
4086         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4087                 conn_try_outdate_peer_async(tconn);
4088
4089         spin_lock_irq(&tconn->req_lock);
4090         oc = tconn->cstate;
4091         if (oc >= C_UNCONNECTED)
4092                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4093
4094         spin_unlock_irq(&tconn->req_lock);
4095
4096         if (oc == C_DISCONNECTING) {
4097                 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4098
4099                 crypto_free_hash(tconn->cram_hmac_tfm);
4100                 tconn->cram_hmac_tfm = NULL;
4101
4102                 kfree(tconn->net_conf);
4103                 tconn->net_conf = NULL;
4104                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
4105         }
4106 }
4107
4108 static int drbd_disconnected(int vnr, void *p, void *data)
4109 {
4110         struct drbd_conf *mdev = (struct drbd_conf *)p;
4111         enum drbd_fencing_p fp;
4112         unsigned int i;
4113
4114         /* wait for current activity to cease. */
4115         spin_lock_irq(&mdev->tconn->req_lock);
4116         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4117         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4118         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4119         spin_unlock_irq(&mdev->tconn->req_lock);
4120
4121         /* We do not have data structures that would allow us to
4122          * get the rs_pending_cnt down to 0 again.
4123          *  * On C_SYNC_TARGET we do not have any data structures describing
4124          *    the pending RSDataRequest's we have sent.
4125          *  * On C_SYNC_SOURCE there is no data structure that tracks
4126          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4127          *  And no, it is not the sum of the reference counts in the
4128          *  resync_LRU. The resync_LRU tracks the whole operation including
4129          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4130          *  on the fly. */
4131         drbd_rs_cancel_all(mdev);
4132         mdev->rs_total = 0;
4133         mdev->rs_failed = 0;
4134         atomic_set(&mdev->rs_pending_cnt, 0);
4135         wake_up(&mdev->misc_wait);
4136
4137         del_timer(&mdev->request_timer);
4138
4139         del_timer_sync(&mdev->resync_timer);
4140         resync_timer_fn((unsigned long)mdev);
4141
4142         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4143          * w_make_resync_request etc. which may still be on the worker queue
4144          * to be "canceled" */
4145         drbd_flush_workqueue(mdev);
4146
4147         drbd_finish_peer_reqs(mdev);
4148
4149         kfree(mdev->p_uuid);
4150         mdev->p_uuid = NULL;
4151
4152         if (!drbd_suspended(mdev))
4153                 tl_clear(mdev->tconn);
4154
4155         drbd_md_sync(mdev);
4156
4157         fp = FP_DONT_CARE;
4158         if (get_ldev(mdev)) {
4159                 fp = mdev->ldev->dc.fencing;
4160                 put_ldev(mdev);
4161         }
4162
4163         /* serialize with bitmap writeout triggered by the state change,
4164          * if any. */
4165         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4166
4167         /* tcp_close and release of sendpage pages can be deferred.  I don't
4168          * want to use SO_LINGER, because apparently it can be deferred for
4169          * more than 20 seconds (longest time I checked).
4170          *
4171          * Actually we don't care for exactly when the network stack does its
4172          * put_page(), but release our reference on these pages right here.
4173          */
4174         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4175         if (i)
4176                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4177         i = atomic_read(&mdev->pp_in_use_by_net);
4178         if (i)
4179                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4180         i = atomic_read(&mdev->pp_in_use);
4181         if (i)
4182                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4183
4184         D_ASSERT(list_empty(&mdev->read_ee));
4185         D_ASSERT(list_empty(&mdev->active_ee));
4186         D_ASSERT(list_empty(&mdev->sync_ee));
4187         D_ASSERT(list_empty(&mdev->done_ee));
4188
4189         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4190         atomic_set(&mdev->current_epoch->epoch_size, 0);
4191         D_ASSERT(list_empty(&mdev->current_epoch->list));
4192
4193         return 0;
4194 }
4195
4196 /*
4197  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4198  * we can agree on is stored in agreed_pro_version.
4199  *
4200  * feature flags and the reserved array should be enough room for future
4201  * enhancements of the handshake protocol, and possible plugins...
4202  *
4203  * for now, they are expected to be zero, but ignored.
4204  */
4205 static int drbd_send_features(struct drbd_tconn *tconn)
4206 {
4207         struct drbd_socket *sock;
4208         struct p_connection_features *p;
4209
4210         sock = &tconn->data;
4211         p = conn_prepare_command(tconn, sock);
4212         if (!p)
4213                 return -EIO;
4214         memset(p, 0, sizeof(*p));
4215         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4216         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4217         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4218 }
4219
4220 /*
4221  * return values:
4222  *   1 yes, we have a valid connection
4223  *   0 oops, did not work out, please try again
4224  *  -1 peer talks different language,
4225  *     no point in trying again, please go standalone.
4226  */
4227 static int drbd_do_features(struct drbd_tconn *tconn)
4228 {
4229         /* ASSERT current == tconn->receiver ... */
4230         struct p_connection_features *p;
4231         const int expect = sizeof(struct p_connection_features);
4232         struct packet_info pi;
4233         int err;
4234
4235         err = drbd_send_features(tconn);
4236         if (err)
4237                 return 0;
4238
4239         err = drbd_recv_header(tconn, &pi);
4240         if (err)
4241                 return 0;
4242
4243         if (pi.cmd != P_CONNECTION_FEATURES) {
4244                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4245                      cmdname(pi.cmd), pi.cmd);
4246                 return -1;
4247         }
4248
4249         if (pi.size != expect) {
4250                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4251                      expect, pi.size);
4252                 return -1;
4253         }
4254
4255         p = pi.data;
4256         err = drbd_recv_all_warn(tconn, p, expect);
4257         if (err)
4258                 return 0;
4259
4260         p->protocol_min = be32_to_cpu(p->protocol_min);
4261         p->protocol_max = be32_to_cpu(p->protocol_max);
4262         if (p->protocol_max == 0)
4263                 p->protocol_max = p->protocol_min;
4264
4265         if (PRO_VERSION_MAX < p->protocol_min ||
4266             PRO_VERSION_MIN > p->protocol_max)
4267                 goto incompat;
4268
4269         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4270
4271         conn_info(tconn, "Handshake successful: "
4272              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4273
4274         return 1;
4275
4276  incompat:
4277         conn_err(tconn, "incompatible DRBD dialects: "
4278             "I support %d-%d, peer supports %d-%d\n",
4279             PRO_VERSION_MIN, PRO_VERSION_MAX,
4280             p->protocol_min, p->protocol_max);
4281         return -1;
4282 }
4283
4284 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4285 static int drbd_do_auth(struct drbd_tconn *tconn)
4286 {
4287         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4288         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4289         return -1;
4290 }
4291 #else
4292 #define CHALLENGE_LEN 64
4293
4294 /* Return value:
4295         1 - auth succeeded,
4296         0 - failed, try again (network error),
4297         -1 - auth failed, don't try again.
4298 */
4299
4300 static int drbd_do_auth(struct drbd_tconn *tconn)
4301 {
4302         struct drbd_socket *sock;
4303         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4304         struct scatterlist sg;
4305         char *response = NULL;
4306         char *right_response = NULL;
4307         char *peers_ch = NULL;
4308         unsigned int key_len = strlen(tconn->net_conf->shared_secret);
4309         unsigned int resp_size;
4310         struct hash_desc desc;
4311         struct packet_info pi;
4312         int err, rv;
4313
4314         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4315
4316         desc.tfm = tconn->cram_hmac_tfm;
4317         desc.flags = 0;
4318
4319         rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4320                                 (u8 *)tconn->net_conf->shared_secret, key_len);
4321         if (rv) {
4322                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4323                 rv = -1;
4324                 goto fail;
4325         }
4326
4327         get_random_bytes(my_challenge, CHALLENGE_LEN);
4328
4329         sock = &tconn->data;
4330         if (!conn_prepare_command(tconn, sock)) {
4331                 rv = 0;
4332                 goto fail;
4333         }
4334         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4335                                 my_challenge, CHALLENGE_LEN);
4336         if (!rv)
4337                 goto fail;
4338
4339         err = drbd_recv_header(tconn, &pi);
4340         if (err) {
4341                 rv = 0;
4342                 goto fail;
4343         }
4344
4345         if (pi.cmd != P_AUTH_CHALLENGE) {
4346                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4347                     cmdname(pi.cmd), pi.cmd);
4348                 rv = 0;
4349                 goto fail;
4350         }
4351
4352         if (pi.size > CHALLENGE_LEN * 2) {
4353                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4354                 rv = -1;
4355                 goto fail;
4356         }
4357
4358         peers_ch = kmalloc(pi.size, GFP_NOIO);
4359         if (peers_ch == NULL) {
4360                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4361                 rv = -1;
4362                 goto fail;
4363         }
4364
4365         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4366         if (err) {
4367                 rv = 0;
4368                 goto fail;
4369         }
4370
4371         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4372         response = kmalloc(resp_size, GFP_NOIO);
4373         if (response == NULL) {
4374                 conn_err(tconn, "kmalloc of response failed\n");
4375                 rv = -1;
4376                 goto fail;
4377         }
4378
4379         sg_init_table(&sg, 1);
4380         sg_set_buf(&sg, peers_ch, pi.size);
4381
4382         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4383         if (rv) {
4384                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4385                 rv = -1;
4386                 goto fail;
4387         }
4388
4389         if (!conn_prepare_command(tconn, sock)) {
4390                 rv = 0;
4391                 goto fail;
4392         }
4393         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4394                                 response, resp_size);
4395         if (!rv)
4396                 goto fail;
4397
4398         err = drbd_recv_header(tconn, &pi);
4399         if (err) {
4400                 rv = 0;
4401                 goto fail;
4402         }
4403
4404         if (pi.cmd != P_AUTH_RESPONSE) {
4405                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4406                         cmdname(pi.cmd), pi.cmd);
4407                 rv = 0;
4408                 goto fail;
4409         }
4410
4411         if (pi.size != resp_size) {
4412                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4413                 rv = 0;
4414                 goto fail;
4415         }
4416
4417         err = drbd_recv_all_warn(tconn, response , resp_size);
4418         if (err) {
4419                 rv = 0;
4420                 goto fail;
4421         }
4422
4423         right_response = kmalloc(resp_size, GFP_NOIO);
4424         if (right_response == NULL) {
4425                 conn_err(tconn, "kmalloc of right_response failed\n");
4426                 rv = -1;
4427                 goto fail;
4428         }
4429
4430         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4431
4432         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4433         if (rv) {
4434                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4435                 rv = -1;
4436                 goto fail;
4437         }
4438
4439         rv = !memcmp(response, right_response, resp_size);
4440
4441         if (rv)
4442                 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4443                      resp_size, tconn->net_conf->cram_hmac_alg);
4444         else
4445                 rv = -1;
4446
4447  fail:
4448         kfree(peers_ch);
4449         kfree(response);
4450         kfree(right_response);
4451
4452         return rv;
4453 }
4454 #endif
4455
4456 int drbdd_init(struct drbd_thread *thi)
4457 {
4458         struct drbd_tconn *tconn = thi->tconn;
4459         int h;
4460
4461         conn_info(tconn, "receiver (re)started\n");
4462
4463         do {
4464                 h = drbd_connect(tconn);
4465                 if (h == 0) {
4466                         drbd_disconnect(tconn);
4467                         schedule_timeout_interruptible(HZ);
4468                 }
4469                 if (h == -1) {
4470                         conn_warn(tconn, "Discarding network configuration.\n");
4471                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4472                 }
4473         } while (h == 0);
4474
4475         if (h > 0) {
4476                 if (get_net_conf(tconn)) {
4477                         drbdd(tconn);
4478                         put_net_conf(tconn);
4479                 }
4480         }
4481
4482         drbd_disconnect(tconn);
4483
4484         conn_info(tconn, "receiver terminated\n");
4485         return 0;
4486 }
4487
4488 /* ********* acknowledge sender ******** */
4489
4490 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4491 {
4492         struct p_req_state_reply *p = pi->data;
4493         int retcode = be32_to_cpu(p->retcode);
4494
4495         if (retcode >= SS_SUCCESS) {
4496                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4497         } else {
4498                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4499                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4500                          drbd_set_st_err_str(retcode), retcode);
4501         }
4502         wake_up(&tconn->ping_wait);
4503
4504         return 0;
4505 }
4506
4507 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4508 {
4509         struct drbd_conf *mdev;
4510         struct p_req_state_reply *p = pi->data;
4511         int retcode = be32_to_cpu(p->retcode);
4512
4513         mdev = vnr_to_mdev(tconn, pi->vnr);
4514         if (!mdev)
4515                 return -EIO;
4516
4517         if (retcode >= SS_SUCCESS) {
4518                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4519         } else {
4520                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4521                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4522                         drbd_set_st_err_str(retcode), retcode);
4523         }
4524         wake_up(&mdev->state_wait);
4525
4526         return 0;
4527 }
4528
4529 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4530 {
4531         return drbd_send_ping_ack(tconn);
4532
4533 }
4534
4535 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4536 {
4537         /* restore idle timeout */
4538         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4539         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4540                 wake_up(&tconn->ping_wait);
4541
4542         return 0;
4543 }
4544
4545 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4546 {
4547         struct drbd_conf *mdev;
4548         struct p_block_ack *p = pi->data;
4549         sector_t sector = be64_to_cpu(p->sector);
4550         int blksize = be32_to_cpu(p->blksize);
4551
4552         mdev = vnr_to_mdev(tconn, pi->vnr);
4553         if (!mdev)
4554                 return -EIO;
4555
4556         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4557
4558         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4559
4560         if (get_ldev(mdev)) {
4561                 drbd_rs_complete_io(mdev, sector);
4562                 drbd_set_in_sync(mdev, sector, blksize);
4563                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4564                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4565                 put_ldev(mdev);
4566         }
4567         dec_rs_pending(mdev);
4568         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4569
4570         return 0;
4571 }
4572
4573 static int
4574 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4575                               struct rb_root *root, const char *func,
4576                               enum drbd_req_event what, bool missing_ok)
4577 {
4578         struct drbd_request *req;
4579         struct bio_and_error m;
4580
4581         spin_lock_irq(&mdev->tconn->req_lock);
4582         req = find_request(mdev, root, id, sector, missing_ok, func);
4583         if (unlikely(!req)) {
4584                 spin_unlock_irq(&mdev->tconn->req_lock);
4585                 return -EIO;
4586         }
4587         __req_mod(req, what, &m);
4588         spin_unlock_irq(&mdev->tconn->req_lock);
4589
4590         if (m.bio)
4591                 complete_master_bio(mdev, &m);
4592         return 0;
4593 }
4594
4595 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4596 {
4597         struct drbd_conf *mdev;
4598         struct p_block_ack *p = pi->data;
4599         sector_t sector = be64_to_cpu(p->sector);
4600         int blksize = be32_to_cpu(p->blksize);
4601         enum drbd_req_event what;
4602
4603         mdev = vnr_to_mdev(tconn, pi->vnr);
4604         if (!mdev)
4605                 return -EIO;
4606
4607         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4608
4609         if (p->block_id == ID_SYNCER) {
4610                 drbd_set_in_sync(mdev, sector, blksize);
4611                 dec_rs_pending(mdev);
4612                 return 0;
4613         }
4614         switch (pi->cmd) {
4615         case P_RS_WRITE_ACK:
4616                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4617                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4618                 break;
4619         case P_WRITE_ACK:
4620                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4621                 what = WRITE_ACKED_BY_PEER;
4622                 break;
4623         case P_RECV_ACK:
4624                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
4625                 what = RECV_ACKED_BY_PEER;
4626                 break;
4627         case P_DISCARD_WRITE:
4628                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4629                 what = DISCARD_WRITE;
4630                 break;
4631         case P_RETRY_WRITE:
4632                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4633                 what = POSTPONE_WRITE;
4634                 break;
4635         default:
4636                 BUG();
4637         }
4638
4639         return validate_req_change_req_state(mdev, p->block_id, sector,
4640                                              &mdev->write_requests, __func__,
4641                                              what, false);
4642 }
4643
4644 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4645 {
4646         struct drbd_conf *mdev;
4647         struct p_block_ack *p = pi->data;
4648         sector_t sector = be64_to_cpu(p->sector);
4649         int size = be32_to_cpu(p->blksize);
4650         bool missing_ok = tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4651                           tconn->net_conf->wire_protocol == DRBD_PROT_B;
4652         int err;
4653
4654         mdev = vnr_to_mdev(tconn, pi->vnr);
4655         if (!mdev)
4656                 return -EIO;
4657
4658         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4659
4660         if (p->block_id == ID_SYNCER) {
4661                 dec_rs_pending(mdev);
4662                 drbd_rs_failed_io(mdev, sector, size);
4663                 return 0;
4664         }
4665
4666         err = validate_req_change_req_state(mdev, p->block_id, sector,
4667                                             &mdev->write_requests, __func__,
4668                                             NEG_ACKED, missing_ok);
4669         if (err) {
4670                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4671                    The master bio might already be completed, therefore the
4672                    request is no longer in the collision hash. */
4673                 /* In Protocol B we might already have got a P_RECV_ACK
4674                    but then get a P_NEG_ACK afterwards. */
4675                 if (!missing_ok)
4676                         return err;
4677                 drbd_set_out_of_sync(mdev, sector, size);
4678         }
4679         return 0;
4680 }
4681
4682 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4683 {
4684         struct drbd_conf *mdev;
4685         struct p_block_ack *p = pi->data;
4686         sector_t sector = be64_to_cpu(p->sector);
4687
4688         mdev = vnr_to_mdev(tconn, pi->vnr);
4689         if (!mdev)
4690                 return -EIO;
4691
4692         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4693
4694         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4695             (unsigned long long)sector, be32_to_cpu(p->blksize));
4696
4697         return validate_req_change_req_state(mdev, p->block_id, sector,
4698                                              &mdev->read_requests, __func__,
4699                                              NEG_ACKED, false);
4700 }
4701
4702 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4703 {
4704         struct drbd_conf *mdev;
4705         sector_t sector;
4706         int size;
4707         struct p_block_ack *p = pi->data;
4708
4709         mdev = vnr_to_mdev(tconn, pi->vnr);
4710         if (!mdev)
4711                 return -EIO;
4712
4713         sector = be64_to_cpu(p->sector);
4714         size = be32_to_cpu(p->blksize);
4715
4716         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4717
4718         dec_rs_pending(mdev);
4719
4720         if (get_ldev_if_state(mdev, D_FAILED)) {
4721                 drbd_rs_complete_io(mdev, sector);
4722                 switch (pi->cmd) {
4723                 case P_NEG_RS_DREPLY:
4724                         drbd_rs_failed_io(mdev, sector, size);
4725                 case P_RS_CANCEL:
4726                         break;
4727                 default:
4728                         BUG();
4729                 }
4730                 put_ldev(mdev);
4731         }
4732
4733         return 0;
4734 }
4735
4736 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4737 {
4738         struct drbd_conf *mdev;
4739         struct p_barrier_ack *p = pi->data;
4740
4741         mdev = vnr_to_mdev(tconn, pi->vnr);
4742         if (!mdev)
4743                 return -EIO;
4744
4745         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4746
4747         if (mdev->state.conn == C_AHEAD &&
4748             atomic_read(&mdev->ap_in_flight) == 0 &&
4749             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4750                 mdev->start_resync_timer.expires = jiffies + HZ;
4751                 add_timer(&mdev->start_resync_timer);
4752         }
4753
4754         return 0;
4755 }
4756
4757 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4758 {
4759         struct drbd_conf *mdev;
4760         struct p_block_ack *p = pi->data;
4761         struct drbd_work *w;
4762         sector_t sector;
4763         int size;
4764
4765         mdev = vnr_to_mdev(tconn, pi->vnr);
4766         if (!mdev)
4767                 return -EIO;
4768
4769         sector = be64_to_cpu(p->sector);
4770         size = be32_to_cpu(p->blksize);
4771
4772         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4773
4774         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4775                 drbd_ov_out_of_sync_found(mdev, sector, size);
4776         else
4777                 ov_out_of_sync_print(mdev);
4778
4779         if (!get_ldev(mdev))
4780                 return 0;
4781
4782         drbd_rs_complete_io(mdev, sector);
4783         dec_rs_pending(mdev);
4784
4785         --mdev->ov_left;
4786
4787         /* let's advance progress step marks only for every other megabyte */
4788         if ((mdev->ov_left & 0x200) == 0x200)
4789                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4790
4791         if (mdev->ov_left == 0) {
4792                 w = kmalloc(sizeof(*w), GFP_NOIO);
4793                 if (w) {
4794                         w->cb = w_ov_finished;
4795                         w->mdev = mdev;
4796                         drbd_queue_work_front(&mdev->tconn->data.work, w);
4797                 } else {
4798                         dev_err(DEV, "kmalloc(w) failed.");
4799                         ov_out_of_sync_print(mdev);
4800                         drbd_resync_finished(mdev);
4801                 }
4802         }
4803         put_ldev(mdev);
4804         return 0;
4805 }
4806
4807 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4808 {
4809         return 0;
4810 }
4811
4812 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
4813 {
4814         struct drbd_conf *mdev;
4815         int i, not_empty = 0;
4816
4817         do {
4818                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4819                 flush_signals(current);
4820                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4821                         if (drbd_finish_peer_reqs(mdev))
4822                                 return 1; /* error */
4823                 }
4824                 set_bit(SIGNAL_ASENDER, &tconn->flags);
4825
4826                 spin_lock_irq(&tconn->req_lock);
4827                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4828                         not_empty = !list_empty(&mdev->done_ee);
4829                         if (not_empty)
4830                                 break;
4831                 }
4832                 spin_unlock_irq(&tconn->req_lock);
4833         } while (not_empty);
4834
4835         return 0;
4836 }
4837
4838 struct asender_cmd {
4839         size_t pkt_size;
4840         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
4841 };
4842
4843 static struct asender_cmd asender_tbl[] = {
4844         [P_PING]            = { 0, got_Ping },
4845         [P_PING_ACK]        = { 0, got_PingAck },
4846         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4847         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4848         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4849         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
4850         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4851         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4852         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
4853         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4854         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4855         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4856         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4857         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4858         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
4859         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4860         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
4861 };
4862
4863 int drbd_asender(struct drbd_thread *thi)
4864 {
4865         struct drbd_tconn *tconn = thi->tconn;
4866         struct asender_cmd *cmd = NULL;
4867         struct packet_info pi;
4868         int rv;
4869         void *buf    = tconn->meta.rbuf;
4870         int received = 0;
4871         unsigned int header_size = drbd_header_size(tconn);
4872         int expect   = header_size;
4873         int ping_timeout_active = 0;
4874
4875         current->policy = SCHED_RR;  /* Make this a realtime task! */
4876         current->rt_priority = 2;    /* more important than all other tasks */
4877
4878         while (get_t_state(thi) == RUNNING) {
4879                 drbd_thread_current_set_cpu(thi);
4880                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
4881                         if (drbd_send_ping(tconn)) {
4882                                 conn_err(tconn, "drbd_send_ping has failed\n");
4883                                 goto reconnect;
4884                         }
4885                         tconn->meta.socket->sk->sk_rcvtimeo =
4886                                 tconn->net_conf->ping_timeo*HZ/10;
4887                         ping_timeout_active = 1;
4888                 }
4889
4890                 /* TODO: conditionally cork; it may hurt latency if we cork without
4891                    much to send */
4892                 if (!tconn->net_conf->no_cork)
4893                         drbd_tcp_cork(tconn->meta.socket);
4894                 if (tconn_finish_peer_reqs(tconn)) {
4895                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
4896                         goto reconnect;
4897                 }
4898                 /* but unconditionally uncork unless disabled */
4899                 if (!tconn->net_conf->no_cork)
4900                         drbd_tcp_uncork(tconn->meta.socket);
4901
4902                 /* short circuit, recv_msg would return EINTR anyways. */
4903                 if (signal_pending(current))
4904                         continue;
4905
4906                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4907                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4908
4909                 flush_signals(current);
4910
4911                 /* Note:
4912                  * -EINTR        (on meta) we got a signal
4913                  * -EAGAIN       (on meta) rcvtimeo expired
4914                  * -ECONNRESET   other side closed the connection
4915                  * -ERESTARTSYS  (on data) we got a signal
4916                  * rv <  0       other than above: unexpected error!
4917                  * rv == expected: full header or command
4918                  * rv <  expected: "woken" by signal during receive
4919                  * rv == 0       : "connection shut down by peer"
4920                  */
4921                 if (likely(rv > 0)) {
4922                         received += rv;
4923                         buf      += rv;
4924                 } else if (rv == 0) {
4925                         conn_err(tconn, "meta connection shut down by peer.\n");
4926                         goto reconnect;
4927                 } else if (rv == -EAGAIN) {
4928                         /* If the data socket received something meanwhile,
4929                          * that is good enough: peer is still alive. */
4930                         if (time_after(tconn->last_received,
4931                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
4932                                 continue;
4933                         if (ping_timeout_active) {
4934                                 conn_err(tconn, "PingAck did not arrive in time.\n");
4935                                 goto reconnect;
4936                         }
4937                         set_bit(SEND_PING, &tconn->flags);
4938                         continue;
4939                 } else if (rv == -EINTR) {
4940                         continue;
4941                 } else {
4942                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
4943                         goto reconnect;
4944                 }
4945
4946                 if (received == expect && cmd == NULL) {
4947                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
4948                                 goto reconnect;
4949                         cmd = &asender_tbl[pi.cmd];
4950                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
4951                                 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
4952                                         pi.cmd, pi.size);
4953                                 goto disconnect;
4954                         }
4955                         expect = header_size + cmd->pkt_size;
4956                         if (pi.size != expect - header_size) {
4957                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
4958                                         pi.cmd, pi.size);
4959                                 goto reconnect;
4960                         }
4961                 }
4962                 if (received == expect) {
4963                         bool err;
4964
4965                         err = cmd->fn(tconn, &pi);
4966                         if (err) {
4967                                 conn_err(tconn, "%pf failed\n", cmd->fn);
4968                                 goto reconnect;
4969                         }
4970
4971                         tconn->last_received = jiffies;
4972
4973                         /* the idle_timeout (ping-int)
4974                          * has been restored in got_PingAck() */
4975                         if (cmd == &asender_tbl[P_PING_ACK])
4976                                 ping_timeout_active = 0;
4977
4978                         buf      = tconn->meta.rbuf;
4979                         received = 0;
4980                         expect   = header_size;
4981                         cmd      = NULL;
4982                 }
4983         }
4984
4985         if (0) {
4986 reconnect:
4987                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4988         }
4989         if (0) {
4990 disconnect:
4991                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4992         }
4993         clear_bit(SIGNAL_ASENDER, &tconn->flags);
4994
4995         conn_info(tconn, "asender terminated\n");
4996
4997         return 0;
4998 }