]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
drbd: Rename drbd_free_ee() and variants to *_peer_req()
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(int vnr, void *p, void *data);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
154 {
155         struct page *page = NULL;
156         struct page *tmp = NULL;
157         int i = 0;
158
159         /* Yes, testing drbd_pp_vacant outside the lock is racy.
160          * So what. It saves a spin_lock. */
161         if (drbd_pp_vacant >= number) {
162                 spin_lock(&drbd_pp_lock);
163                 page = page_chain_del(&drbd_pp_pool, number);
164                 if (page)
165                         drbd_pp_vacant -= number;
166                 spin_unlock(&drbd_pp_lock);
167                 if (page)
168                         return page;
169         }
170
171         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
172          * "criss-cross" setup, that might cause write-out on some other DRBD,
173          * which in turn might block on the other node at this very place.  */
174         for (i = 0; i < number; i++) {
175                 tmp = alloc_page(GFP_TRY);
176                 if (!tmp)
177                         break;
178                 set_page_private(tmp, (unsigned long)page);
179                 page = tmp;
180         }
181
182         if (i == number)
183                 return page;
184
185         /* Not enough pages immediately available this time.
186          * No need to jump around here, drbd_pp_alloc will retry this
187          * function "soon". */
188         if (page) {
189                 tmp = page_chain_tail(page, NULL);
190                 spin_lock(&drbd_pp_lock);
191                 page_chain_add(&drbd_pp_pool, page, tmp);
192                 drbd_pp_vacant += i;
193                 spin_unlock(&drbd_pp_lock);
194         }
195         return NULL;
196 }
197
198 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
199 {
200         struct drbd_peer_request *peer_req;
201         struct list_head *le, *tle;
202
203         /* The EEs are always appended to the end of the list. Since
204            they are sent in order over the wire, they have to finish
205            in order. As soon as we see the first not finished we can
206            stop to examine the list... */
207
208         list_for_each_safe(le, tle, &mdev->net_ee) {
209                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
210                 if (drbd_ee_has_active_page(peer_req))
211                         break;
212                 list_move(le, to_be_freed);
213         }
214 }
215
216 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
217 {
218         LIST_HEAD(reclaimed);
219         struct drbd_peer_request *peer_req, *t;
220
221         spin_lock_irq(&mdev->tconn->req_lock);
222         reclaim_net_ee(mdev, &reclaimed);
223         spin_unlock_irq(&mdev->tconn->req_lock);
224
225         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
226                 drbd_free_net_peer_req(mdev, peer_req);
227 }
228
229 /**
230  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
231  * @mdev:       DRBD device.
232  * @number:     number of pages requested
233  * @retry:      whether to retry, if not enough pages are available right now
234  *
235  * Tries to allocate number pages, first from our own page pool, then from
236  * the kernel, unless this allocation would exceed the max_buffers setting.
237  * Possibly retry until DRBD frees sufficient pages somewhere else.
238  *
239  * Returns a page chain linked via page->private.
240  */
241 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
242 {
243         struct page *page = NULL;
244         DEFINE_WAIT(wait);
245
246         /* Yes, we may run up to @number over max_buffers. If we
247          * follow it strictly, the admin will get it wrong anyways. */
248         if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
249                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250
251         while (page == NULL) {
252                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
253
254                 drbd_kick_lo_and_reclaim_net(mdev);
255
256                 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
257                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
258                         if (page)
259                                 break;
260                 }
261
262                 if (!retry)
263                         break;
264
265                 if (signal_pending(current)) {
266                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
267                         break;
268                 }
269
270                 schedule();
271         }
272         finish_wait(&drbd_pp_wait, &wait);
273
274         if (page)
275                 atomic_add(number, &mdev->pp_in_use);
276         return page;
277 }
278
279 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
280  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
281  * Either links the page chain back to the global pool,
282  * or returns all pages to the system. */
283 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
284 {
285         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
286         int i;
287
288         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
289                 i = page_chain_free(page);
290         else {
291                 struct page *tmp;
292                 tmp = page_chain_tail(page, &i);
293                 spin_lock(&drbd_pp_lock);
294                 page_chain_add(&drbd_pp_pool, page, tmp);
295                 drbd_pp_vacant += i;
296                 spin_unlock(&drbd_pp_lock);
297         }
298         i = atomic_sub_return(i, a);
299         if (i < 0)
300                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
301                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
302         wake_up(&drbd_pp_wait);
303 }
304
305 /*
306 You need to hold the req_lock:
307  _drbd_wait_ee_list_empty()
308
309 You must not have the req_lock:
310  drbd_free_peer_req()
311  drbd_alloc_peer_req()
312  drbd_release_ee()
313  drbd_ee_fix_bhs()
314  drbd_process_done_ee()
315  drbd_clear_done_ee()
316  drbd_wait_ee_list_empty()
317 */
318
319 struct drbd_peer_request *
320 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
321                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
322 {
323         struct drbd_peer_request *peer_req;
324         struct page *page;
325         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
326
327         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
328                 return NULL;
329
330         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
331         if (!peer_req) {
332                 if (!(gfp_mask & __GFP_NOWARN))
333                         dev_err(DEV, "%s: allocation failed\n", __func__);
334                 return NULL;
335         }
336
337         page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
338         if (!page)
339                 goto fail;
340
341         drbd_clear_interval(&peer_req->i);
342         peer_req->i.size = data_size;
343         peer_req->i.sector = sector;
344         peer_req->i.local = false;
345         peer_req->i.waiting = false;
346
347         peer_req->epoch = NULL;
348         peer_req->w.mdev = mdev;
349         peer_req->pages = page;
350         atomic_set(&peer_req->pending_bios, 0);
351         peer_req->flags = 0;
352         /*
353          * The block_id is opaque to the receiver.  It is not endianness
354          * converted, and sent back to the sender unchanged.
355          */
356         peer_req->block_id = id;
357
358         return peer_req;
359
360  fail:
361         mempool_free(peer_req, drbd_ee_mempool);
362         return NULL;
363 }
364
365 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
366                        int is_net)
367 {
368         if (peer_req->flags & EE_HAS_DIGEST)
369                 kfree(peer_req->digest);
370         drbd_pp_free(mdev, peer_req->pages, is_net);
371         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
372         D_ASSERT(drbd_interval_empty(&peer_req->i));
373         mempool_free(peer_req, drbd_ee_mempool);
374 }
375
376 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
377 {
378         LIST_HEAD(work_list);
379         struct drbd_peer_request *peer_req, *t;
380         int count = 0;
381         int is_net = list == &mdev->net_ee;
382
383         spin_lock_irq(&mdev->tconn->req_lock);
384         list_splice_init(list, &work_list);
385         spin_unlock_irq(&mdev->tconn->req_lock);
386
387         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
388                 __drbd_free_peer_req(mdev, peer_req, is_net);
389                 count++;
390         }
391         return count;
392 }
393
394
395 /* See also comments in _req_mod(,BARRIER_ACKED)
396  * and receive_Barrier.
397  *
398  * Move entries from net_ee to done_ee, if ready.
399  * Grab done_ee, call all callbacks, free the entries.
400  * The callbacks typically send out ACKs.
401  */
402 static int drbd_process_done_ee(struct drbd_conf *mdev)
403 {
404         LIST_HEAD(work_list);
405         LIST_HEAD(reclaimed);
406         struct drbd_peer_request *peer_req, *t;
407         int err = 0;
408
409         spin_lock_irq(&mdev->tconn->req_lock);
410         reclaim_net_ee(mdev, &reclaimed);
411         list_splice_init(&mdev->done_ee, &work_list);
412         spin_unlock_irq(&mdev->tconn->req_lock);
413
414         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
415                 drbd_free_net_peer_req(mdev, peer_req);
416
417         /* possible callbacks here:
418          * e_end_block, and e_end_resync_block, e_send_discard_write.
419          * all ignore the last argument.
420          */
421         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
422                 int err2;
423
424                 /* list_del not necessary, next/prev members not touched */
425                 err2 = peer_req->w.cb(&peer_req->w, !!err);
426                 if (!err)
427                         err = err2;
428                 drbd_free_peer_req(mdev, peer_req);
429         }
430         wake_up(&mdev->ee_wait);
431
432         return err;
433 }
434
435 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
436 {
437         DEFINE_WAIT(wait);
438
439         /* avoids spin_lock/unlock
440          * and calling prepare_to_wait in the fast path */
441         while (!list_empty(head)) {
442                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
443                 spin_unlock_irq(&mdev->tconn->req_lock);
444                 io_schedule();
445                 finish_wait(&mdev->ee_wait, &wait);
446                 spin_lock_irq(&mdev->tconn->req_lock);
447         }
448 }
449
450 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
451 {
452         spin_lock_irq(&mdev->tconn->req_lock);
453         _drbd_wait_ee_list_empty(mdev, head);
454         spin_unlock_irq(&mdev->tconn->req_lock);
455 }
456
457 /* see also kernel_accept; which is only present since 2.6.18.
458  * also we want to log which part of it failed, exactly */
459 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
460 {
461         struct sock *sk = sock->sk;
462         int err = 0;
463
464         *what = "listen";
465         err = sock->ops->listen(sock, 5);
466         if (err < 0)
467                 goto out;
468
469         *what = "sock_create_lite";
470         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
471                                newsock);
472         if (err < 0)
473                 goto out;
474
475         *what = "accept";
476         err = sock->ops->accept(sock, *newsock, 0);
477         if (err < 0) {
478                 sock_release(*newsock);
479                 *newsock = NULL;
480                 goto out;
481         }
482         (*newsock)->ops  = sock->ops;
483
484 out:
485         return err;
486 }
487
488 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
489 {
490         mm_segment_t oldfs;
491         struct kvec iov = {
492                 .iov_base = buf,
493                 .iov_len = size,
494         };
495         struct msghdr msg = {
496                 .msg_iovlen = 1,
497                 .msg_iov = (struct iovec *)&iov,
498                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
499         };
500         int rv;
501
502         oldfs = get_fs();
503         set_fs(KERNEL_DS);
504         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
505         set_fs(oldfs);
506
507         return rv;
508 }
509
510 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
511 {
512         mm_segment_t oldfs;
513         struct kvec iov = {
514                 .iov_base = buf,
515                 .iov_len = size,
516         };
517         struct msghdr msg = {
518                 .msg_iovlen = 1,
519                 .msg_iov = (struct iovec *)&iov,
520                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
521         };
522         int rv;
523
524         oldfs = get_fs();
525         set_fs(KERNEL_DS);
526
527         for (;;) {
528                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
529                 if (rv == size)
530                         break;
531
532                 /* Note:
533                  * ECONNRESET   other side closed the connection
534                  * ERESTARTSYS  (on  sock) we got a signal
535                  */
536
537                 if (rv < 0) {
538                         if (rv == -ECONNRESET)
539                                 conn_info(tconn, "sock was reset by peer\n");
540                         else if (rv != -ERESTARTSYS)
541                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
542                         break;
543                 } else if (rv == 0) {
544                         conn_info(tconn, "sock was shut down by peer\n");
545                         break;
546                 } else  {
547                         /* signal came in, or peer/link went down,
548                          * after we read a partial message
549                          */
550                         /* D_ASSERT(signal_pending(current)); */
551                         break;
552                 }
553         };
554
555         set_fs(oldfs);
556
557         if (rv != size)
558                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
559
560         return rv;
561 }
562
563 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
564 {
565         int err;
566
567         err = drbd_recv(tconn, buf, size);
568         if (err != size) {
569                 if (err >= 0)
570                         err = -EIO;
571         } else
572                 err = 0;
573         return err;
574 }
575
576 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
577 {
578         int err;
579
580         err = drbd_recv_all(tconn, buf, size);
581         if (err && !signal_pending(current))
582                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
583         return err;
584 }
585
586 /* quoting tcp(7):
587  *   On individual connections, the socket buffer size must be set prior to the
588  *   listen(2) or connect(2) calls in order to have it take effect.
589  * This is our wrapper to do so.
590  */
591 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
592                 unsigned int rcv)
593 {
594         /* open coded SO_SNDBUF, SO_RCVBUF */
595         if (snd) {
596                 sock->sk->sk_sndbuf = snd;
597                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
598         }
599         if (rcv) {
600                 sock->sk->sk_rcvbuf = rcv;
601                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
602         }
603 }
604
605 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
606 {
607         const char *what;
608         struct socket *sock;
609         struct sockaddr_in6 src_in6;
610         int err;
611         int disconnect_on_error = 1;
612
613         if (!get_net_conf(tconn))
614                 return NULL;
615
616         what = "sock_create_kern";
617         err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
618                 SOCK_STREAM, IPPROTO_TCP, &sock);
619         if (err < 0) {
620                 sock = NULL;
621                 goto out;
622         }
623
624         sock->sk->sk_rcvtimeo =
625         sock->sk->sk_sndtimeo =  tconn->net_conf->try_connect_int*HZ;
626         drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
627                         tconn->net_conf->rcvbuf_size);
628
629        /* explicitly bind to the configured IP as source IP
630         *  for the outgoing connections.
631         *  This is needed for multihomed hosts and to be
632         *  able to use lo: interfaces for drbd.
633         * Make sure to use 0 as port number, so linux selects
634         *  a free one dynamically.
635         */
636         memcpy(&src_in6, tconn->net_conf->my_addr,
637                min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
638         if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
639                 src_in6.sin6_port = 0;
640         else
641                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
642
643         what = "bind before connect";
644         err = sock->ops->bind(sock,
645                               (struct sockaddr *) &src_in6,
646                               tconn->net_conf->my_addr_len);
647         if (err < 0)
648                 goto out;
649
650         /* connect may fail, peer not yet available.
651          * stay C_WF_CONNECTION, don't go Disconnecting! */
652         disconnect_on_error = 0;
653         what = "connect";
654         err = sock->ops->connect(sock,
655                                  (struct sockaddr *)tconn->net_conf->peer_addr,
656                                  tconn->net_conf->peer_addr_len, 0);
657
658 out:
659         if (err < 0) {
660                 if (sock) {
661                         sock_release(sock);
662                         sock = NULL;
663                 }
664                 switch (-err) {
665                         /* timeout, busy, signal pending */
666                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
667                 case EINTR: case ERESTARTSYS:
668                         /* peer not (yet) available, network problem */
669                 case ECONNREFUSED: case ENETUNREACH:
670                 case EHOSTDOWN:    case EHOSTUNREACH:
671                         disconnect_on_error = 0;
672                         break;
673                 default:
674                         conn_err(tconn, "%s failed, err = %d\n", what, err);
675                 }
676                 if (disconnect_on_error)
677                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
678         }
679         put_net_conf(tconn);
680         return sock;
681 }
682
683 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
684 {
685         int timeo, err;
686         struct socket *s_estab = NULL, *s_listen;
687         const char *what;
688
689         if (!get_net_conf(tconn))
690                 return NULL;
691
692         what = "sock_create_kern";
693         err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
694                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
695         if (err) {
696                 s_listen = NULL;
697                 goto out;
698         }
699
700         timeo = tconn->net_conf->try_connect_int * HZ;
701         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
702
703         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
704         s_listen->sk->sk_rcvtimeo = timeo;
705         s_listen->sk->sk_sndtimeo = timeo;
706         drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
707                         tconn->net_conf->rcvbuf_size);
708
709         what = "bind before listen";
710         err = s_listen->ops->bind(s_listen,
711                               (struct sockaddr *) tconn->net_conf->my_addr,
712                               tconn->net_conf->my_addr_len);
713         if (err < 0)
714                 goto out;
715
716         err = drbd_accept(&what, s_listen, &s_estab);
717
718 out:
719         if (s_listen)
720                 sock_release(s_listen);
721         if (err < 0) {
722                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
723                         conn_err(tconn, "%s failed, err = %d\n", what, err);
724                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
725                 }
726         }
727         put_net_conf(tconn);
728
729         return s_estab;
730 }
731
732 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
733
734 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
735                              enum drbd_packet cmd)
736 {
737         if (!conn_prepare_command(tconn, sock))
738                 return -EIO;
739         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
740 }
741
742 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
743 {
744         unsigned int header_size = drbd_header_size(tconn);
745         struct packet_info pi;
746         int err;
747
748         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
749         if (err != header_size) {
750                 if (err >= 0)
751                         err = -EIO;
752                 return err;
753         }
754         err = decode_header(tconn, tconn->data.rbuf, &pi);
755         if (err)
756                 return err;
757         return pi.cmd;
758 }
759
760 /**
761  * drbd_socket_okay() - Free the socket if its connection is not okay
762  * @sock:       pointer to the pointer to the socket.
763  */
764 static int drbd_socket_okay(struct socket **sock)
765 {
766         int rr;
767         char tb[4];
768
769         if (!*sock)
770                 return false;
771
772         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
773
774         if (rr > 0 || rr == -EAGAIN) {
775                 return true;
776         } else {
777                 sock_release(*sock);
778                 *sock = NULL;
779                 return false;
780         }
781 }
782 /* Gets called if a connection is established, or if a new minor gets created
783    in a connection */
784 int drbd_connected(int vnr, void *p, void *data)
785 {
786         struct drbd_conf *mdev = (struct drbd_conf *)p;
787         int err;
788
789         atomic_set(&mdev->packet_seq, 0);
790         mdev->peer_seq = 0;
791
792         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
793                 &mdev->tconn->cstate_mutex :
794                 &mdev->own_state_mutex;
795
796         err = drbd_send_sync_param(mdev);
797         if (!err)
798                 err = drbd_send_sizes(mdev, 0, 0);
799         if (!err)
800                 err = drbd_send_uuids(mdev);
801         if (!err)
802                 err = drbd_send_state(mdev);
803         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
804         clear_bit(RESIZE_PENDING, &mdev->flags);
805         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
806         return err;
807 }
808
809 /*
810  * return values:
811  *   1 yes, we have a valid connection
812  *   0 oops, did not work out, please try again
813  *  -1 peer talks different language,
814  *     no point in trying again, please go standalone.
815  *  -2 We do not have a network config...
816  */
817 static int drbd_connect(struct drbd_tconn *tconn)
818 {
819         struct socket *sock, *msock;
820         int try, h, ok;
821
822         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
823                 return -2;
824
825         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
826
827         /* Assume that the peer only understands protocol 80 until we know better.  */
828         tconn->agreed_pro_version = 80;
829
830         do {
831                 struct socket *s;
832
833                 for (try = 0;;) {
834                         /* 3 tries, this should take less than a second! */
835                         s = drbd_try_connect(tconn);
836                         if (s || ++try >= 3)
837                                 break;
838                         /* give the other side time to call bind() & listen() */
839                         schedule_timeout_interruptible(HZ / 10);
840                 }
841
842                 if (s) {
843                         if (!tconn->data.socket) {
844                                 tconn->data.socket = s;
845                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
846                         } else if (!tconn->meta.socket) {
847                                 tconn->meta.socket = s;
848                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
849                         } else {
850                                 conn_err(tconn, "Logic error in drbd_connect()\n");
851                                 goto out_release_sockets;
852                         }
853                 }
854
855                 if (tconn->data.socket && tconn->meta.socket) {
856                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
857                         ok = drbd_socket_okay(&tconn->data.socket);
858                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
859                         if (ok)
860                                 break;
861                 }
862
863 retry:
864                 s = drbd_wait_for_connect(tconn);
865                 if (s) {
866                         try = receive_first_packet(tconn, s);
867                         drbd_socket_okay(&tconn->data.socket);
868                         drbd_socket_okay(&tconn->meta.socket);
869                         switch (try) {
870                         case P_INITIAL_DATA:
871                                 if (tconn->data.socket) {
872                                         conn_warn(tconn, "initial packet S crossed\n");
873                                         sock_release(tconn->data.socket);
874                                 }
875                                 tconn->data.socket = s;
876                                 break;
877                         case P_INITIAL_META:
878                                 if (tconn->meta.socket) {
879                                         conn_warn(tconn, "initial packet M crossed\n");
880                                         sock_release(tconn->meta.socket);
881                                 }
882                                 tconn->meta.socket = s;
883                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
884                                 break;
885                         default:
886                                 conn_warn(tconn, "Error receiving initial packet\n");
887                                 sock_release(s);
888                                 if (random32() & 1)
889                                         goto retry;
890                         }
891                 }
892
893                 if (tconn->cstate <= C_DISCONNECTING)
894                         goto out_release_sockets;
895                 if (signal_pending(current)) {
896                         flush_signals(current);
897                         smp_rmb();
898                         if (get_t_state(&tconn->receiver) == EXITING)
899                                 goto out_release_sockets;
900                 }
901
902                 if (tconn->data.socket && &tconn->meta.socket) {
903                         ok = drbd_socket_okay(&tconn->data.socket);
904                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
905                         if (ok)
906                                 break;
907                 }
908         } while (1);
909
910         sock  = tconn->data.socket;
911         msock = tconn->meta.socket;
912
913         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
914         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
915
916         sock->sk->sk_allocation = GFP_NOIO;
917         msock->sk->sk_allocation = GFP_NOIO;
918
919         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
920         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
921
922         /* NOT YET ...
923          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
924          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
925          * first set it to the P_CONNECTION_FEATURES timeout,
926          * which we set to 4x the configured ping_timeout. */
927         sock->sk->sk_sndtimeo =
928         sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
929
930         msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
931         msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
932
933         /* we don't want delays.
934          * we use TCP_CORK where appropriate, though */
935         drbd_tcp_nodelay(sock);
936         drbd_tcp_nodelay(msock);
937
938         tconn->last_received = jiffies;
939
940         h = drbd_do_features(tconn);
941         if (h <= 0)
942                 return h;
943
944         if (tconn->cram_hmac_tfm) {
945                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
946                 switch (drbd_do_auth(tconn)) {
947                 case -1:
948                         conn_err(tconn, "Authentication of peer failed\n");
949                         return -1;
950                 case 0:
951                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
952                         return 0;
953                 }
954         }
955
956         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
957                 return 0;
958
959         sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
960         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
961
962         drbd_thread_start(&tconn->asender);
963
964         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
965                 return -1;
966
967         return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
968
969 out_release_sockets:
970         if (tconn->data.socket) {
971                 sock_release(tconn->data.socket);
972                 tconn->data.socket = NULL;
973         }
974         if (tconn->meta.socket) {
975                 sock_release(tconn->meta.socket);
976                 tconn->meta.socket = NULL;
977         }
978         return -1;
979 }
980
981 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
982 {
983         unsigned int header_size = drbd_header_size(tconn);
984
985         if (header_size == sizeof(struct p_header100) &&
986             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
987                 struct p_header100 *h = header;
988                 if (h->pad != 0) {
989                         conn_err(tconn, "Header padding is not zero\n");
990                         return -EINVAL;
991                 }
992                 pi->vnr = be16_to_cpu(h->volume);
993                 pi->cmd = be16_to_cpu(h->command);
994                 pi->size = be32_to_cpu(h->length);
995         } else if (header_size == sizeof(struct p_header95) &&
996                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
997                 struct p_header95 *h = header;
998                 pi->cmd = be16_to_cpu(h->command);
999                 pi->size = be32_to_cpu(h->length);
1000                 pi->vnr = 0;
1001         } else if (header_size == sizeof(struct p_header80) &&
1002                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1003                 struct p_header80 *h = header;
1004                 pi->cmd = be16_to_cpu(h->command);
1005                 pi->size = be16_to_cpu(h->length);
1006                 pi->vnr = 0;
1007         } else {
1008                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1009                          be32_to_cpu(*(__be32 *)header),
1010                          tconn->agreed_pro_version);
1011                 return -EINVAL;
1012         }
1013         pi->data = header + header_size;
1014         return 0;
1015 }
1016
1017 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1018 {
1019         void *buffer = tconn->data.rbuf;
1020         int err;
1021
1022         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1023         if (err)
1024                 return err;
1025
1026         err = decode_header(tconn, buffer, pi);
1027         tconn->last_received = jiffies;
1028
1029         return err;
1030 }
1031
1032 static void drbd_flush(struct drbd_conf *mdev)
1033 {
1034         int rv;
1035
1036         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1037                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1038                                         NULL);
1039                 if (rv) {
1040                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
1041                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1042                          * don't try again for ANY return value != 0
1043                          * if (rv == -EOPNOTSUPP) */
1044                         drbd_bump_write_ordering(mdev, WO_drain_io);
1045                 }
1046                 put_ldev(mdev);
1047         }
1048 }
1049
1050 /**
1051  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1052  * @mdev:       DRBD device.
1053  * @epoch:      Epoch object.
1054  * @ev:         Epoch event.
1055  */
1056 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1057                                                struct drbd_epoch *epoch,
1058                                                enum epoch_event ev)
1059 {
1060         int epoch_size;
1061         struct drbd_epoch *next_epoch;
1062         enum finish_epoch rv = FE_STILL_LIVE;
1063
1064         spin_lock(&mdev->epoch_lock);
1065         do {
1066                 next_epoch = NULL;
1067
1068                 epoch_size = atomic_read(&epoch->epoch_size);
1069
1070                 switch (ev & ~EV_CLEANUP) {
1071                 case EV_PUT:
1072                         atomic_dec(&epoch->active);
1073                         break;
1074                 case EV_GOT_BARRIER_NR:
1075                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1076                         break;
1077                 case EV_BECAME_LAST:
1078                         /* nothing to do*/
1079                         break;
1080                 }
1081
1082                 if (epoch_size != 0 &&
1083                     atomic_read(&epoch->active) == 0 &&
1084                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1085                         if (!(ev & EV_CLEANUP)) {
1086                                 spin_unlock(&mdev->epoch_lock);
1087                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1088                                 spin_lock(&mdev->epoch_lock);
1089                         }
1090                         dec_unacked(mdev);
1091
1092                         if (mdev->current_epoch != epoch) {
1093                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1094                                 list_del(&epoch->list);
1095                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1096                                 mdev->epochs--;
1097                                 kfree(epoch);
1098
1099                                 if (rv == FE_STILL_LIVE)
1100                                         rv = FE_DESTROYED;
1101                         } else {
1102                                 epoch->flags = 0;
1103                                 atomic_set(&epoch->epoch_size, 0);
1104                                 /* atomic_set(&epoch->active, 0); is already zero */
1105                                 if (rv == FE_STILL_LIVE)
1106                                         rv = FE_RECYCLED;
1107                                 wake_up(&mdev->ee_wait);
1108                         }
1109                 }
1110
1111                 if (!next_epoch)
1112                         break;
1113
1114                 epoch = next_epoch;
1115         } while (1);
1116
1117         spin_unlock(&mdev->epoch_lock);
1118
1119         return rv;
1120 }
1121
1122 /**
1123  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1124  * @mdev:       DRBD device.
1125  * @wo:         Write ordering method to try.
1126  */
1127 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1128 {
1129         enum write_ordering_e pwo;
1130         static char *write_ordering_str[] = {
1131                 [WO_none] = "none",
1132                 [WO_drain_io] = "drain",
1133                 [WO_bdev_flush] = "flush",
1134         };
1135
1136         pwo = mdev->write_ordering;
1137         wo = min(pwo, wo);
1138         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1139                 wo = WO_drain_io;
1140         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1141                 wo = WO_none;
1142         mdev->write_ordering = wo;
1143         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1144                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1145 }
1146
1147 /**
1148  * drbd_submit_peer_request()
1149  * @mdev:       DRBD device.
1150  * @peer_req:   peer request
1151  * @rw:         flag field, see bio->bi_rw
1152  *
1153  * May spread the pages to multiple bios,
1154  * depending on bio_add_page restrictions.
1155  *
1156  * Returns 0 if all bios have been submitted,
1157  * -ENOMEM if we could not allocate enough bios,
1158  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1159  *  single page to an empty bio (which should never happen and likely indicates
1160  *  that the lower level IO stack is in some way broken). This has been observed
1161  *  on certain Xen deployments.
1162  */
1163 /* TODO allocate from our own bio_set. */
1164 int drbd_submit_peer_request(struct drbd_conf *mdev,
1165                              struct drbd_peer_request *peer_req,
1166                              const unsigned rw, const int fault_type)
1167 {
1168         struct bio *bios = NULL;
1169         struct bio *bio;
1170         struct page *page = peer_req->pages;
1171         sector_t sector = peer_req->i.sector;
1172         unsigned ds = peer_req->i.size;
1173         unsigned n_bios = 0;
1174         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1175         int err = -ENOMEM;
1176
1177         /* In most cases, we will only need one bio.  But in case the lower
1178          * level restrictions happen to be different at this offset on this
1179          * side than those of the sending peer, we may need to submit the
1180          * request in more than one bio.
1181          *
1182          * Plain bio_alloc is good enough here, this is no DRBD internally
1183          * generated bio, but a bio allocated on behalf of the peer.
1184          */
1185 next_bio:
1186         bio = bio_alloc(GFP_NOIO, nr_pages);
1187         if (!bio) {
1188                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1189                 goto fail;
1190         }
1191         /* > peer_req->i.sector, unless this is the first bio */
1192         bio->bi_sector = sector;
1193         bio->bi_bdev = mdev->ldev->backing_bdev;
1194         bio->bi_rw = rw;
1195         bio->bi_private = peer_req;
1196         bio->bi_end_io = drbd_peer_request_endio;
1197
1198         bio->bi_next = bios;
1199         bios = bio;
1200         ++n_bios;
1201
1202         page_chain_for_each(page) {
1203                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1204                 if (!bio_add_page(bio, page, len, 0)) {
1205                         /* A single page must always be possible!
1206                          * But in case it fails anyways,
1207                          * we deal with it, and complain (below). */
1208                         if (bio->bi_vcnt == 0) {
1209                                 dev_err(DEV,
1210                                         "bio_add_page failed for len=%u, "
1211                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1212                                         len, (unsigned long long)bio->bi_sector);
1213                                 err = -ENOSPC;
1214                                 goto fail;
1215                         }
1216                         goto next_bio;
1217                 }
1218                 ds -= len;
1219                 sector += len >> 9;
1220                 --nr_pages;
1221         }
1222         D_ASSERT(page == NULL);
1223         D_ASSERT(ds == 0);
1224
1225         atomic_set(&peer_req->pending_bios, n_bios);
1226         do {
1227                 bio = bios;
1228                 bios = bios->bi_next;
1229                 bio->bi_next = NULL;
1230
1231                 drbd_generic_make_request(mdev, fault_type, bio);
1232         } while (bios);
1233         return 0;
1234
1235 fail:
1236         while (bios) {
1237                 bio = bios;
1238                 bios = bios->bi_next;
1239                 bio_put(bio);
1240         }
1241         return err;
1242 }
1243
1244 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1245                                              struct drbd_peer_request *peer_req)
1246 {
1247         struct drbd_interval *i = &peer_req->i;
1248
1249         drbd_remove_interval(&mdev->write_requests, i);
1250         drbd_clear_interval(i);
1251
1252         /* Wake up any processes waiting for this peer request to complete.  */
1253         if (i->waiting)
1254                 wake_up(&mdev->misc_wait);
1255 }
1256
1257 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1258 {
1259         struct drbd_conf *mdev;
1260         int rv;
1261         struct p_barrier *p = pi->data;
1262         struct drbd_epoch *epoch;
1263
1264         mdev = vnr_to_mdev(tconn, pi->vnr);
1265         if (!mdev)
1266                 return -EIO;
1267
1268         inc_unacked(mdev);
1269
1270         mdev->current_epoch->barrier_nr = p->barrier;
1271         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1272
1273         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1274          * the activity log, which means it would not be resynced in case the
1275          * R_PRIMARY crashes now.
1276          * Therefore we must send the barrier_ack after the barrier request was
1277          * completed. */
1278         switch (mdev->write_ordering) {
1279         case WO_none:
1280                 if (rv == FE_RECYCLED)
1281                         return 0;
1282
1283                 /* receiver context, in the writeout path of the other node.
1284                  * avoid potential distributed deadlock */
1285                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1286                 if (epoch)
1287                         break;
1288                 else
1289                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1290                         /* Fall through */
1291
1292         case WO_bdev_flush:
1293         case WO_drain_io:
1294                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1295                 drbd_flush(mdev);
1296
1297                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1298                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1299                         if (epoch)
1300                                 break;
1301                 }
1302
1303                 epoch = mdev->current_epoch;
1304                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1305
1306                 D_ASSERT(atomic_read(&epoch->active) == 0);
1307                 D_ASSERT(epoch->flags == 0);
1308
1309                 return 0;
1310         default:
1311                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1312                 return -EIO;
1313         }
1314
1315         epoch->flags = 0;
1316         atomic_set(&epoch->epoch_size, 0);
1317         atomic_set(&epoch->active, 0);
1318
1319         spin_lock(&mdev->epoch_lock);
1320         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1321                 list_add(&epoch->list, &mdev->current_epoch->list);
1322                 mdev->current_epoch = epoch;
1323                 mdev->epochs++;
1324         } else {
1325                 /* The current_epoch got recycled while we allocated this one... */
1326                 kfree(epoch);
1327         }
1328         spin_unlock(&mdev->epoch_lock);
1329
1330         return 0;
1331 }
1332
1333 /* used from receive_RSDataReply (recv_resync_read)
1334  * and from receive_Data */
1335 static struct drbd_peer_request *
1336 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1337               int data_size) __must_hold(local)
1338 {
1339         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1340         struct drbd_peer_request *peer_req;
1341         struct page *page;
1342         int dgs, ds, err;
1343         void *dig_in = mdev->tconn->int_dig_in;
1344         void *dig_vv = mdev->tconn->int_dig_vv;
1345         unsigned long *data;
1346
1347         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1348                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1349
1350         if (dgs) {
1351                 /*
1352                  * FIXME: Receive the incoming digest into the receive buffer
1353                  *        here, together with its struct p_data?
1354                  */
1355                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1356                 if (err)
1357                         return NULL;
1358         }
1359
1360         data_size -= dgs;
1361
1362         if (!expect(data_size != 0))
1363                 return NULL;
1364         if (!expect(IS_ALIGNED(data_size, 512)))
1365                 return NULL;
1366         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1367                 return NULL;
1368
1369         /* even though we trust out peer,
1370          * we sometimes have to double check. */
1371         if (sector + (data_size>>9) > capacity) {
1372                 dev_err(DEV, "request from peer beyond end of local disk: "
1373                         "capacity: %llus < sector: %llus + size: %u\n",
1374                         (unsigned long long)capacity,
1375                         (unsigned long long)sector, data_size);
1376                 return NULL;
1377         }
1378
1379         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1380          * "criss-cross" setup, that might cause write-out on some other DRBD,
1381          * which in turn might block on the other node at this very place.  */
1382         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1383         if (!peer_req)
1384                 return NULL;
1385
1386         ds = data_size;
1387         page = peer_req->pages;
1388         page_chain_for_each(page) {
1389                 unsigned len = min_t(int, ds, PAGE_SIZE);
1390                 data = kmap(page);
1391                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1392                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1393                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1394                         data[0] = data[0] ^ (unsigned long)-1;
1395                 }
1396                 kunmap(page);
1397                 if (err) {
1398                         drbd_free_peer_req(mdev, peer_req);
1399                         return NULL;
1400                 }
1401                 ds -= len;
1402         }
1403
1404         if (dgs) {
1405                 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1406                 if (memcmp(dig_in, dig_vv, dgs)) {
1407                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1408                                 (unsigned long long)sector, data_size);
1409                         drbd_free_peer_req(mdev, peer_req);
1410                         return NULL;
1411                 }
1412         }
1413         mdev->recv_cnt += data_size>>9;
1414         return peer_req;
1415 }
1416
1417 /* drbd_drain_block() just takes a data block
1418  * out of the socket input buffer, and discards it.
1419  */
1420 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1421 {
1422         struct page *page;
1423         int err = 0;
1424         void *data;
1425
1426         if (!data_size)
1427                 return 0;
1428
1429         page = drbd_pp_alloc(mdev, 1, 1);
1430
1431         data = kmap(page);
1432         while (data_size) {
1433                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1434
1435                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1436                 if (err)
1437                         break;
1438                 data_size -= len;
1439         }
1440         kunmap(page);
1441         drbd_pp_free(mdev, page, 0);
1442         return err;
1443 }
1444
1445 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1446                            sector_t sector, int data_size)
1447 {
1448         struct bio_vec *bvec;
1449         struct bio *bio;
1450         int dgs, err, i, expect;
1451         void *dig_in = mdev->tconn->int_dig_in;
1452         void *dig_vv = mdev->tconn->int_dig_vv;
1453
1454         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1455                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1456
1457         if (dgs) {
1458                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1459                 if (err)
1460                         return err;
1461         }
1462
1463         data_size -= dgs;
1464
1465         /* optimistically update recv_cnt.  if receiving fails below,
1466          * we disconnect anyways, and counters will be reset. */
1467         mdev->recv_cnt += data_size>>9;
1468
1469         bio = req->master_bio;
1470         D_ASSERT(sector == bio->bi_sector);
1471
1472         bio_for_each_segment(bvec, bio, i) {
1473                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1474                 expect = min_t(int, data_size, bvec->bv_len);
1475                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1476                 kunmap(bvec->bv_page);
1477                 if (err)
1478                         return err;
1479                 data_size -= expect;
1480         }
1481
1482         if (dgs) {
1483                 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1484                 if (memcmp(dig_in, dig_vv, dgs)) {
1485                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1486                         return -EINVAL;
1487                 }
1488         }
1489
1490         D_ASSERT(data_size == 0);
1491         return 0;
1492 }
1493
1494 /* e_end_resync_block() is called via
1495  * drbd_process_done_ee() by asender only */
1496 static int e_end_resync_block(struct drbd_work *w, int unused)
1497 {
1498         struct drbd_peer_request *peer_req =
1499                 container_of(w, struct drbd_peer_request, w);
1500         struct drbd_conf *mdev = w->mdev;
1501         sector_t sector = peer_req->i.sector;
1502         int err;
1503
1504         D_ASSERT(drbd_interval_empty(&peer_req->i));
1505
1506         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1507                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1508                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1509         } else {
1510                 /* Record failure to sync */
1511                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1512
1513                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1514         }
1515         dec_unacked(mdev);
1516
1517         return err;
1518 }
1519
1520 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1521 {
1522         struct drbd_peer_request *peer_req;
1523
1524         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1525         if (!peer_req)
1526                 goto fail;
1527
1528         dec_rs_pending(mdev);
1529
1530         inc_unacked(mdev);
1531         /* corresponding dec_unacked() in e_end_resync_block()
1532          * respective _drbd_clear_done_ee */
1533
1534         peer_req->w.cb = e_end_resync_block;
1535
1536         spin_lock_irq(&mdev->tconn->req_lock);
1537         list_add(&peer_req->w.list, &mdev->sync_ee);
1538         spin_unlock_irq(&mdev->tconn->req_lock);
1539
1540         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1541         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1542                 return 0;
1543
1544         /* don't care for the reason here */
1545         dev_err(DEV, "submit failed, triggering re-connect\n");
1546         spin_lock_irq(&mdev->tconn->req_lock);
1547         list_del(&peer_req->w.list);
1548         spin_unlock_irq(&mdev->tconn->req_lock);
1549
1550         drbd_free_peer_req(mdev, peer_req);
1551 fail:
1552         put_ldev(mdev);
1553         return -EIO;
1554 }
1555
1556 static struct drbd_request *
1557 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1558              sector_t sector, bool missing_ok, const char *func)
1559 {
1560         struct drbd_request *req;
1561
1562         /* Request object according to our peer */
1563         req = (struct drbd_request *)(unsigned long)id;
1564         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1565                 return req;
1566         if (!missing_ok) {
1567                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1568                         (unsigned long)id, (unsigned long long)sector);
1569         }
1570         return NULL;
1571 }
1572
1573 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1574 {
1575         struct drbd_conf *mdev;
1576         struct drbd_request *req;
1577         sector_t sector;
1578         int err;
1579         struct p_data *p = pi->data;
1580
1581         mdev = vnr_to_mdev(tconn, pi->vnr);
1582         if (!mdev)
1583                 return -EIO;
1584
1585         sector = be64_to_cpu(p->sector);
1586
1587         spin_lock_irq(&mdev->tconn->req_lock);
1588         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1589         spin_unlock_irq(&mdev->tconn->req_lock);
1590         if (unlikely(!req))
1591                 return -EIO;
1592
1593         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1594          * special casing it there for the various failure cases.
1595          * still no race with drbd_fail_pending_reads */
1596         err = recv_dless_read(mdev, req, sector, pi->size);
1597         if (!err)
1598                 req_mod(req, DATA_RECEIVED);
1599         /* else: nothing. handled from drbd_disconnect...
1600          * I don't think we may complete this just yet
1601          * in case we are "on-disconnect: freeze" */
1602
1603         return err;
1604 }
1605
1606 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1607 {
1608         struct drbd_conf *mdev;
1609         sector_t sector;
1610         int err;
1611         struct p_data *p = pi->data;
1612
1613         mdev = vnr_to_mdev(tconn, pi->vnr);
1614         if (!mdev)
1615                 return -EIO;
1616
1617         sector = be64_to_cpu(p->sector);
1618         D_ASSERT(p->block_id == ID_SYNCER);
1619
1620         if (get_ldev(mdev)) {
1621                 /* data is submitted to disk within recv_resync_read.
1622                  * corresponding put_ldev done below on error,
1623                  * or in drbd_peer_request_endio. */
1624                 err = recv_resync_read(mdev, sector, pi->size);
1625         } else {
1626                 if (__ratelimit(&drbd_ratelimit_state))
1627                         dev_err(DEV, "Can not write resync data to local disk.\n");
1628
1629                 err = drbd_drain_block(mdev, pi->size);
1630
1631                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1632         }
1633
1634         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1635
1636         return err;
1637 }
1638
1639 static int w_restart_write(struct drbd_work *w, int cancel)
1640 {
1641         struct drbd_request *req = container_of(w, struct drbd_request, w);
1642         struct drbd_conf *mdev = w->mdev;
1643         struct bio *bio;
1644         unsigned long start_time;
1645         unsigned long flags;
1646
1647         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1648         if (!expect(req->rq_state & RQ_POSTPONED)) {
1649                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1650                 return -EIO;
1651         }
1652         bio = req->master_bio;
1653         start_time = req->start_time;
1654         /* Postponed requests will not have their master_bio completed!  */
1655         __req_mod(req, DISCARD_WRITE, NULL);
1656         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1657
1658         while (__drbd_make_request(mdev, bio, start_time))
1659                 /* retry */ ;
1660         return 0;
1661 }
1662
1663 static void restart_conflicting_writes(struct drbd_conf *mdev,
1664                                        sector_t sector, int size)
1665 {
1666         struct drbd_interval *i;
1667         struct drbd_request *req;
1668
1669         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1670                 if (!i->local)
1671                         continue;
1672                 req = container_of(i, struct drbd_request, i);
1673                 if (req->rq_state & RQ_LOCAL_PENDING ||
1674                     !(req->rq_state & RQ_POSTPONED))
1675                         continue;
1676                 if (expect(list_empty(&req->w.list))) {
1677                         req->w.mdev = mdev;
1678                         req->w.cb = w_restart_write;
1679                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1680                 }
1681         }
1682 }
1683
1684 /* e_end_block() is called via drbd_process_done_ee().
1685  * this means this function only runs in the asender thread
1686  */
1687 static int e_end_block(struct drbd_work *w, int cancel)
1688 {
1689         struct drbd_peer_request *peer_req =
1690                 container_of(w, struct drbd_peer_request, w);
1691         struct drbd_conf *mdev = w->mdev;
1692         sector_t sector = peer_req->i.sector;
1693         int err = 0, pcmd;
1694
1695         if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
1696                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1697                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1698                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1699                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1700                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1701                         err = drbd_send_ack(mdev, pcmd, peer_req);
1702                         if (pcmd == P_RS_WRITE_ACK)
1703                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1704                 } else {
1705                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1706                         /* we expect it to be marked out of sync anyways...
1707                          * maybe assert this?  */
1708                 }
1709                 dec_unacked(mdev);
1710         }
1711         /* we delete from the conflict detection hash _after_ we sent out the
1712          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1713         if (mdev->tconn->net_conf->two_primaries) {
1714                 spin_lock_irq(&mdev->tconn->req_lock);
1715                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1716                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1717                 if (peer_req->flags & EE_RESTART_REQUESTS)
1718                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1719                 spin_unlock_irq(&mdev->tconn->req_lock);
1720         } else
1721                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1722
1723         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1724
1725         return err;
1726 }
1727
1728 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1729 {
1730         struct drbd_conf *mdev = w->mdev;
1731         struct drbd_peer_request *peer_req =
1732                 container_of(w, struct drbd_peer_request, w);
1733         int err;
1734
1735         err = drbd_send_ack(mdev, ack, peer_req);
1736         dec_unacked(mdev);
1737
1738         return err;
1739 }
1740
1741 static int e_send_discard_write(struct drbd_work *w, int unused)
1742 {
1743         return e_send_ack(w, P_DISCARD_WRITE);
1744 }
1745
1746 static int e_send_retry_write(struct drbd_work *w, int unused)
1747 {
1748         struct drbd_tconn *tconn = w->mdev->tconn;
1749
1750         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1751                              P_RETRY_WRITE : P_DISCARD_WRITE);
1752 }
1753
1754 static bool seq_greater(u32 a, u32 b)
1755 {
1756         /*
1757          * We assume 32-bit wrap-around here.
1758          * For 24-bit wrap-around, we would have to shift:
1759          *  a <<= 8; b <<= 8;
1760          */
1761         return (s32)a - (s32)b > 0;
1762 }
1763
1764 static u32 seq_max(u32 a, u32 b)
1765 {
1766         return seq_greater(a, b) ? a : b;
1767 }
1768
1769 static bool need_peer_seq(struct drbd_conf *mdev)
1770 {
1771         struct drbd_tconn *tconn = mdev->tconn;
1772
1773         /*
1774          * We only need to keep track of the last packet_seq number of our peer
1775          * if we are in dual-primary mode and we have the discard flag set; see
1776          * handle_write_conflicts().
1777          */
1778         return tconn->net_conf->two_primaries &&
1779                test_bit(DISCARD_CONCURRENT, &tconn->flags);
1780 }
1781
1782 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1783 {
1784         unsigned int newest_peer_seq;
1785
1786         if (need_peer_seq(mdev)) {
1787                 spin_lock(&mdev->peer_seq_lock);
1788                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1789                 mdev->peer_seq = newest_peer_seq;
1790                 spin_unlock(&mdev->peer_seq_lock);
1791                 /* wake up only if we actually changed mdev->peer_seq */
1792                 if (peer_seq == newest_peer_seq)
1793                         wake_up(&mdev->seq_wait);
1794         }
1795 }
1796
1797 /* Called from receive_Data.
1798  * Synchronize packets on sock with packets on msock.
1799  *
1800  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1801  * packet traveling on msock, they are still processed in the order they have
1802  * been sent.
1803  *
1804  * Note: we don't care for Ack packets overtaking P_DATA packets.
1805  *
1806  * In case packet_seq is larger than mdev->peer_seq number, there are
1807  * outstanding packets on the msock. We wait for them to arrive.
1808  * In case we are the logically next packet, we update mdev->peer_seq
1809  * ourselves. Correctly handles 32bit wrap around.
1810  *
1811  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1812  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1813  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1814  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1815  *
1816  * returns 0 if we may process the packet,
1817  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1818 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1819 {
1820         DEFINE_WAIT(wait);
1821         long timeout;
1822         int ret;
1823
1824         if (!need_peer_seq(mdev))
1825                 return 0;
1826
1827         spin_lock(&mdev->peer_seq_lock);
1828         for (;;) {
1829                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1830                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1831                         ret = 0;
1832                         break;
1833                 }
1834                 if (signal_pending(current)) {
1835                         ret = -ERESTARTSYS;
1836                         break;
1837                 }
1838                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1839                 spin_unlock(&mdev->peer_seq_lock);
1840                 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1841                 timeout = schedule_timeout(timeout);
1842                 spin_lock(&mdev->peer_seq_lock);
1843                 if (!timeout) {
1844                         ret = -ETIMEDOUT;
1845                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1846                         break;
1847                 }
1848         }
1849         spin_unlock(&mdev->peer_seq_lock);
1850         finish_wait(&mdev->seq_wait, &wait);
1851         return ret;
1852 }
1853
1854 /* see also bio_flags_to_wire()
1855  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1856  * flags and back. We may replicate to other kernel versions. */
1857 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1858 {
1859         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1860                 (dpf & DP_FUA ? REQ_FUA : 0) |
1861                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1862                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1863 }
1864
1865 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1866                                     unsigned int size)
1867 {
1868         struct drbd_interval *i;
1869
1870     repeat:
1871         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1872                 struct drbd_request *req;
1873                 struct bio_and_error m;
1874
1875                 if (!i->local)
1876                         continue;
1877                 req = container_of(i, struct drbd_request, i);
1878                 if (!(req->rq_state & RQ_POSTPONED))
1879                         continue;
1880                 req->rq_state &= ~RQ_POSTPONED;
1881                 __req_mod(req, NEG_ACKED, &m);
1882                 spin_unlock_irq(&mdev->tconn->req_lock);
1883                 if (m.bio)
1884                         complete_master_bio(mdev, &m);
1885                 spin_lock_irq(&mdev->tconn->req_lock);
1886                 goto repeat;
1887         }
1888 }
1889
1890 static int handle_write_conflicts(struct drbd_conf *mdev,
1891                                   struct drbd_peer_request *peer_req)
1892 {
1893         struct drbd_tconn *tconn = mdev->tconn;
1894         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1895         sector_t sector = peer_req->i.sector;
1896         const unsigned int size = peer_req->i.size;
1897         struct drbd_interval *i;
1898         bool equal;
1899         int err;
1900
1901         /*
1902          * Inserting the peer request into the write_requests tree will prevent
1903          * new conflicting local requests from being added.
1904          */
1905         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1906
1907     repeat:
1908         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1909                 if (i == &peer_req->i)
1910                         continue;
1911
1912                 if (!i->local) {
1913                         /*
1914                          * Our peer has sent a conflicting remote request; this
1915                          * should not happen in a two-node setup.  Wait for the
1916                          * earlier peer request to complete.
1917                          */
1918                         err = drbd_wait_misc(mdev, i);
1919                         if (err)
1920                                 goto out;
1921                         goto repeat;
1922                 }
1923
1924                 equal = i->sector == sector && i->size == size;
1925                 if (resolve_conflicts) {
1926                         /*
1927                          * If the peer request is fully contained within the
1928                          * overlapping request, it can be discarded; otherwise,
1929                          * it will be retried once all overlapping requests
1930                          * have completed.
1931                          */
1932                         bool discard = i->sector <= sector && i->sector +
1933                                        (i->size >> 9) >= sector + (size >> 9);
1934
1935                         if (!equal)
1936                                 dev_alert(DEV, "Concurrent writes detected: "
1937                                                "local=%llus +%u, remote=%llus +%u, "
1938                                                "assuming %s came first\n",
1939                                           (unsigned long long)i->sector, i->size,
1940                                           (unsigned long long)sector, size,
1941                                           discard ? "local" : "remote");
1942
1943                         inc_unacked(mdev);
1944                         peer_req->w.cb = discard ? e_send_discard_write :
1945                                                    e_send_retry_write;
1946                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
1947                         wake_asender(mdev->tconn);
1948
1949                         err = -ENOENT;
1950                         goto out;
1951                 } else {
1952                         struct drbd_request *req =
1953                                 container_of(i, struct drbd_request, i);
1954
1955                         if (!equal)
1956                                 dev_alert(DEV, "Concurrent writes detected: "
1957                                                "local=%llus +%u, remote=%llus +%u\n",
1958                                           (unsigned long long)i->sector, i->size,
1959                                           (unsigned long long)sector, size);
1960
1961                         if (req->rq_state & RQ_LOCAL_PENDING ||
1962                             !(req->rq_state & RQ_POSTPONED)) {
1963                                 /*
1964                                  * Wait for the node with the discard flag to
1965                                  * decide if this request will be discarded or
1966                                  * retried.  Requests that are discarded will
1967                                  * disappear from the write_requests tree.
1968                                  *
1969                                  * In addition, wait for the conflicting
1970                                  * request to finish locally before submitting
1971                                  * the conflicting peer request.
1972                                  */
1973                                 err = drbd_wait_misc(mdev, &req->i);
1974                                 if (err) {
1975                                         _conn_request_state(mdev->tconn,
1976                                                             NS(conn, C_TIMEOUT),
1977                                                             CS_HARD);
1978                                         fail_postponed_requests(mdev, sector, size);
1979                                         goto out;
1980                                 }
1981                                 goto repeat;
1982                         }
1983                         /*
1984                          * Remember to restart the conflicting requests after
1985                          * the new peer request has completed.
1986                          */
1987                         peer_req->flags |= EE_RESTART_REQUESTS;
1988                 }
1989         }
1990         err = 0;
1991
1992     out:
1993         if (err)
1994                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1995         return err;
1996 }
1997
1998 /* mirrored write */
1999 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2000 {
2001         struct drbd_conf *mdev;
2002         sector_t sector;
2003         struct drbd_peer_request *peer_req;
2004         struct p_data *p = pi->data;
2005         u32 peer_seq = be32_to_cpu(p->seq_num);
2006         int rw = WRITE;
2007         u32 dp_flags;
2008         int err;
2009
2010         mdev = vnr_to_mdev(tconn, pi->vnr);
2011         if (!mdev)
2012                 return -EIO;
2013
2014         if (!get_ldev(mdev)) {
2015                 int err2;
2016
2017                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2018                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2019                 atomic_inc(&mdev->current_epoch->epoch_size);
2020                 err2 = drbd_drain_block(mdev, pi->size);
2021                 if (!err)
2022                         err = err2;
2023                 return err;
2024         }
2025
2026         /*
2027          * Corresponding put_ldev done either below (on various errors), or in
2028          * drbd_peer_request_endio, if we successfully submit the data at the
2029          * end of this function.
2030          */
2031
2032         sector = be64_to_cpu(p->sector);
2033         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2034         if (!peer_req) {
2035                 put_ldev(mdev);
2036                 return -EIO;
2037         }
2038
2039         peer_req->w.cb = e_end_block;
2040
2041         dp_flags = be32_to_cpu(p->dp_flags);
2042         rw |= wire_flags_to_bio(mdev, dp_flags);
2043
2044         if (dp_flags & DP_MAY_SET_IN_SYNC)
2045                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2046
2047         spin_lock(&mdev->epoch_lock);
2048         peer_req->epoch = mdev->current_epoch;
2049         atomic_inc(&peer_req->epoch->epoch_size);
2050         atomic_inc(&peer_req->epoch->active);
2051         spin_unlock(&mdev->epoch_lock);
2052
2053         if (mdev->tconn->net_conf->two_primaries) {
2054                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2055                 if (err)
2056                         goto out_interrupted;
2057                 spin_lock_irq(&mdev->tconn->req_lock);
2058                 err = handle_write_conflicts(mdev, peer_req);
2059                 if (err) {
2060                         spin_unlock_irq(&mdev->tconn->req_lock);
2061                         if (err == -ENOENT) {
2062                                 put_ldev(mdev);
2063                                 return 0;
2064                         }
2065                         goto out_interrupted;
2066                 }
2067         } else
2068                 spin_lock_irq(&mdev->tconn->req_lock);
2069         list_add(&peer_req->w.list, &mdev->active_ee);
2070         spin_unlock_irq(&mdev->tconn->req_lock);
2071
2072         switch (mdev->tconn->net_conf->wire_protocol) {
2073         case DRBD_PROT_C:
2074                 inc_unacked(mdev);
2075                 /* corresponding dec_unacked() in e_end_block()
2076                  * respective _drbd_clear_done_ee */
2077                 break;
2078         case DRBD_PROT_B:
2079                 /* I really don't like it that the receiver thread
2080                  * sends on the msock, but anyways */
2081                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2082                 break;
2083         case DRBD_PROT_A:
2084                 /* nothing to do */
2085                 break;
2086         }
2087
2088         if (mdev->state.pdsk < D_INCONSISTENT) {
2089                 /* In case we have the only disk of the cluster, */
2090                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2091                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2092                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2093                 drbd_al_begin_io(mdev, &peer_req->i);
2094         }
2095
2096         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2097         if (!err)
2098                 return 0;
2099
2100         /* don't care for the reason here */
2101         dev_err(DEV, "submit failed, triggering re-connect\n");
2102         spin_lock_irq(&mdev->tconn->req_lock);
2103         list_del(&peer_req->w.list);
2104         drbd_remove_epoch_entry_interval(mdev, peer_req);
2105         spin_unlock_irq(&mdev->tconn->req_lock);
2106         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2107                 drbd_al_complete_io(mdev, &peer_req->i);
2108
2109 out_interrupted:
2110         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2111         put_ldev(mdev);
2112         drbd_free_peer_req(mdev, peer_req);
2113         return err;
2114 }
2115
2116 /* We may throttle resync, if the lower device seems to be busy,
2117  * and current sync rate is above c_min_rate.
2118  *
2119  * To decide whether or not the lower device is busy, we use a scheme similar
2120  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2121  * (more than 64 sectors) of activity we cannot account for with our own resync
2122  * activity, it obviously is "busy".
2123  *
2124  * The current sync rate used here uses only the most recent two step marks,
2125  * to have a short time average so we can react faster.
2126  */
2127 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2128 {
2129         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2130         unsigned long db, dt, dbdt;
2131         struct lc_element *tmp;
2132         int curr_events;
2133         int throttle = 0;
2134
2135         /* feature disabled? */
2136         if (mdev->ldev->dc.c_min_rate == 0)
2137                 return 0;
2138
2139         spin_lock_irq(&mdev->al_lock);
2140         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2141         if (tmp) {
2142                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2143                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2144                         spin_unlock_irq(&mdev->al_lock);
2145                         return 0;
2146                 }
2147                 /* Do not slow down if app IO is already waiting for this extent */
2148         }
2149         spin_unlock_irq(&mdev->al_lock);
2150
2151         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2152                       (int)part_stat_read(&disk->part0, sectors[1]) -
2153                         atomic_read(&mdev->rs_sect_ev);
2154
2155         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2156                 unsigned long rs_left;
2157                 int i;
2158
2159                 mdev->rs_last_events = curr_events;
2160
2161                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2162                  * approx. */
2163                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2164
2165                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2166                         rs_left = mdev->ov_left;
2167                 else
2168                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2169
2170                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2171                 if (!dt)
2172                         dt++;
2173                 db = mdev->rs_mark_left[i] - rs_left;
2174                 dbdt = Bit2KB(db/dt);
2175
2176                 if (dbdt > mdev->ldev->dc.c_min_rate)
2177                         throttle = 1;
2178         }
2179         return throttle;
2180 }
2181
2182
2183 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2184 {
2185         struct drbd_conf *mdev;
2186         sector_t sector;
2187         sector_t capacity;
2188         struct drbd_peer_request *peer_req;
2189         struct digest_info *di = NULL;
2190         int size, verb;
2191         unsigned int fault_type;
2192         struct p_block_req *p = pi->data;
2193
2194         mdev = vnr_to_mdev(tconn, pi->vnr);
2195         if (!mdev)
2196                 return -EIO;
2197         capacity = drbd_get_capacity(mdev->this_bdev);
2198
2199         sector = be64_to_cpu(p->sector);
2200         size   = be32_to_cpu(p->blksize);
2201
2202         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2203                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2204                                 (unsigned long long)sector, size);
2205                 return -EINVAL;
2206         }
2207         if (sector + (size>>9) > capacity) {
2208                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2209                                 (unsigned long long)sector, size);
2210                 return -EINVAL;
2211         }
2212
2213         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2214                 verb = 1;
2215                 switch (pi->cmd) {
2216                 case P_DATA_REQUEST:
2217                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2218                         break;
2219                 case P_RS_DATA_REQUEST:
2220                 case P_CSUM_RS_REQUEST:
2221                 case P_OV_REQUEST:
2222                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2223                         break;
2224                 case P_OV_REPLY:
2225                         verb = 0;
2226                         dec_rs_pending(mdev);
2227                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2228                         break;
2229                 default:
2230                         BUG();
2231                 }
2232                 if (verb && __ratelimit(&drbd_ratelimit_state))
2233                         dev_err(DEV, "Can not satisfy peer's read request, "
2234                             "no local data.\n");
2235
2236                 /* drain possibly payload */
2237                 return drbd_drain_block(mdev, pi->size);
2238         }
2239
2240         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2241          * "criss-cross" setup, that might cause write-out on some other DRBD,
2242          * which in turn might block on the other node at this very place.  */
2243         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2244         if (!peer_req) {
2245                 put_ldev(mdev);
2246                 return -ENOMEM;
2247         }
2248
2249         switch (pi->cmd) {
2250         case P_DATA_REQUEST:
2251                 peer_req->w.cb = w_e_end_data_req;
2252                 fault_type = DRBD_FAULT_DT_RD;
2253                 /* application IO, don't drbd_rs_begin_io */
2254                 goto submit;
2255
2256         case P_RS_DATA_REQUEST:
2257                 peer_req->w.cb = w_e_end_rsdata_req;
2258                 fault_type = DRBD_FAULT_RS_RD;
2259                 /* used in the sector offset progress display */
2260                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2261                 break;
2262
2263         case P_OV_REPLY:
2264         case P_CSUM_RS_REQUEST:
2265                 fault_type = DRBD_FAULT_RS_RD;
2266                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2267                 if (!di)
2268                         goto out_free_e;
2269
2270                 di->digest_size = pi->size;
2271                 di->digest = (((char *)di)+sizeof(struct digest_info));
2272
2273                 peer_req->digest = di;
2274                 peer_req->flags |= EE_HAS_DIGEST;
2275
2276                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2277                         goto out_free_e;
2278
2279                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2280                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2281                         peer_req->w.cb = w_e_end_csum_rs_req;
2282                         /* used in the sector offset progress display */
2283                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2284                 } else if (pi->cmd == P_OV_REPLY) {
2285                         /* track progress, we may need to throttle */
2286                         atomic_add(size >> 9, &mdev->rs_sect_in);
2287                         peer_req->w.cb = w_e_end_ov_reply;
2288                         dec_rs_pending(mdev);
2289                         /* drbd_rs_begin_io done when we sent this request,
2290                          * but accounting still needs to be done. */
2291                         goto submit_for_resync;
2292                 }
2293                 break;
2294
2295         case P_OV_REQUEST:
2296                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2297                     mdev->tconn->agreed_pro_version >= 90) {
2298                         unsigned long now = jiffies;
2299                         int i;
2300                         mdev->ov_start_sector = sector;
2301                         mdev->ov_position = sector;
2302                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2303                         mdev->rs_total = mdev->ov_left;
2304                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2305                                 mdev->rs_mark_left[i] = mdev->ov_left;
2306                                 mdev->rs_mark_time[i] = now;
2307                         }
2308                         dev_info(DEV, "Online Verify start sector: %llu\n",
2309                                         (unsigned long long)sector);
2310                 }
2311                 peer_req->w.cb = w_e_end_ov_req;
2312                 fault_type = DRBD_FAULT_RS_RD;
2313                 break;
2314
2315         default:
2316                 BUG();
2317         }
2318
2319         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2320          * wrt the receiver, but it is not as straightforward as it may seem.
2321          * Various places in the resync start and stop logic assume resync
2322          * requests are processed in order, requeuing this on the worker thread
2323          * introduces a bunch of new code for synchronization between threads.
2324          *
2325          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2326          * "forever", throttling after drbd_rs_begin_io will lock that extent
2327          * for application writes for the same time.  For now, just throttle
2328          * here, where the rest of the code expects the receiver to sleep for
2329          * a while, anyways.
2330          */
2331
2332         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2333          * this defers syncer requests for some time, before letting at least
2334          * on request through.  The resync controller on the receiving side
2335          * will adapt to the incoming rate accordingly.
2336          *
2337          * We cannot throttle here if remote is Primary/SyncTarget:
2338          * we would also throttle its application reads.
2339          * In that case, throttling is done on the SyncTarget only.
2340          */
2341         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2342                 schedule_timeout_uninterruptible(HZ/10);
2343         if (drbd_rs_begin_io(mdev, sector))
2344                 goto out_free_e;
2345
2346 submit_for_resync:
2347         atomic_add(size >> 9, &mdev->rs_sect_ev);
2348
2349 submit:
2350         inc_unacked(mdev);
2351         spin_lock_irq(&mdev->tconn->req_lock);
2352         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2353         spin_unlock_irq(&mdev->tconn->req_lock);
2354
2355         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2356                 return 0;
2357
2358         /* don't care for the reason here */
2359         dev_err(DEV, "submit failed, triggering re-connect\n");
2360         spin_lock_irq(&mdev->tconn->req_lock);
2361         list_del(&peer_req->w.list);
2362         spin_unlock_irq(&mdev->tconn->req_lock);
2363         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2364
2365 out_free_e:
2366         put_ldev(mdev);
2367         drbd_free_peer_req(mdev, peer_req);
2368         return -EIO;
2369 }
2370
2371 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2372 {
2373         int self, peer, rv = -100;
2374         unsigned long ch_self, ch_peer;
2375
2376         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2377         peer = mdev->p_uuid[UI_BITMAP] & 1;
2378
2379         ch_peer = mdev->p_uuid[UI_SIZE];
2380         ch_self = mdev->comm_bm_set;
2381
2382         switch (mdev->tconn->net_conf->after_sb_0p) {
2383         case ASB_CONSENSUS:
2384         case ASB_DISCARD_SECONDARY:
2385         case ASB_CALL_HELPER:
2386                 dev_err(DEV, "Configuration error.\n");
2387                 break;
2388         case ASB_DISCONNECT:
2389                 break;
2390         case ASB_DISCARD_YOUNGER_PRI:
2391                 if (self == 0 && peer == 1) {
2392                         rv = -1;
2393                         break;
2394                 }
2395                 if (self == 1 && peer == 0) {
2396                         rv =  1;
2397                         break;
2398                 }
2399                 /* Else fall through to one of the other strategies... */
2400         case ASB_DISCARD_OLDER_PRI:
2401                 if (self == 0 && peer == 1) {
2402                         rv = 1;
2403                         break;
2404                 }
2405                 if (self == 1 && peer == 0) {
2406                         rv = -1;
2407                         break;
2408                 }
2409                 /* Else fall through to one of the other strategies... */
2410                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2411                      "Using discard-least-changes instead\n");
2412         case ASB_DISCARD_ZERO_CHG:
2413                 if (ch_peer == 0 && ch_self == 0) {
2414                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2415                                 ? -1 : 1;
2416                         break;
2417                 } else {
2418                         if (ch_peer == 0) { rv =  1; break; }
2419                         if (ch_self == 0) { rv = -1; break; }
2420                 }
2421                 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2422                         break;
2423         case ASB_DISCARD_LEAST_CHG:
2424                 if      (ch_self < ch_peer)
2425                         rv = -1;
2426                 else if (ch_self > ch_peer)
2427                         rv =  1;
2428                 else /* ( ch_self == ch_peer ) */
2429                      /* Well, then use something else. */
2430                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2431                                 ? -1 : 1;
2432                 break;
2433         case ASB_DISCARD_LOCAL:
2434                 rv = -1;
2435                 break;
2436         case ASB_DISCARD_REMOTE:
2437                 rv =  1;
2438         }
2439
2440         return rv;
2441 }
2442
2443 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2444 {
2445         int hg, rv = -100;
2446
2447         switch (mdev->tconn->net_conf->after_sb_1p) {
2448         case ASB_DISCARD_YOUNGER_PRI:
2449         case ASB_DISCARD_OLDER_PRI:
2450         case ASB_DISCARD_LEAST_CHG:
2451         case ASB_DISCARD_LOCAL:
2452         case ASB_DISCARD_REMOTE:
2453                 dev_err(DEV, "Configuration error.\n");
2454                 break;
2455         case ASB_DISCONNECT:
2456                 break;
2457         case ASB_CONSENSUS:
2458                 hg = drbd_asb_recover_0p(mdev);
2459                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2460                         rv = hg;
2461                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2462                         rv = hg;
2463                 break;
2464         case ASB_VIOLENTLY:
2465                 rv = drbd_asb_recover_0p(mdev);
2466                 break;
2467         case ASB_DISCARD_SECONDARY:
2468                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2469         case ASB_CALL_HELPER:
2470                 hg = drbd_asb_recover_0p(mdev);
2471                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2472                         enum drbd_state_rv rv2;
2473
2474                         drbd_set_role(mdev, R_SECONDARY, 0);
2475                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2476                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2477                           * we do not need to wait for the after state change work either. */
2478                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2479                         if (rv2 != SS_SUCCESS) {
2480                                 drbd_khelper(mdev, "pri-lost-after-sb");
2481                         } else {
2482                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2483                                 rv = hg;
2484                         }
2485                 } else
2486                         rv = hg;
2487         }
2488
2489         return rv;
2490 }
2491
2492 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2493 {
2494         int hg, rv = -100;
2495
2496         switch (mdev->tconn->net_conf->after_sb_2p) {
2497         case ASB_DISCARD_YOUNGER_PRI:
2498         case ASB_DISCARD_OLDER_PRI:
2499         case ASB_DISCARD_LEAST_CHG:
2500         case ASB_DISCARD_LOCAL:
2501         case ASB_DISCARD_REMOTE:
2502         case ASB_CONSENSUS:
2503         case ASB_DISCARD_SECONDARY:
2504                 dev_err(DEV, "Configuration error.\n");
2505                 break;
2506         case ASB_VIOLENTLY:
2507                 rv = drbd_asb_recover_0p(mdev);
2508                 break;
2509         case ASB_DISCONNECT:
2510                 break;
2511         case ASB_CALL_HELPER:
2512                 hg = drbd_asb_recover_0p(mdev);
2513                 if (hg == -1) {
2514                         enum drbd_state_rv rv2;
2515
2516                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2517                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2518                           * we do not need to wait for the after state change work either. */
2519                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2520                         if (rv2 != SS_SUCCESS) {
2521                                 drbd_khelper(mdev, "pri-lost-after-sb");
2522                         } else {
2523                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2524                                 rv = hg;
2525                         }
2526                 } else
2527                         rv = hg;
2528         }
2529
2530         return rv;
2531 }
2532
2533 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2534                            u64 bits, u64 flags)
2535 {
2536         if (!uuid) {
2537                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2538                 return;
2539         }
2540         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2541              text,
2542              (unsigned long long)uuid[UI_CURRENT],
2543              (unsigned long long)uuid[UI_BITMAP],
2544              (unsigned long long)uuid[UI_HISTORY_START],
2545              (unsigned long long)uuid[UI_HISTORY_END],
2546              (unsigned long long)bits,
2547              (unsigned long long)flags);
2548 }
2549
2550 /*
2551   100   after split brain try auto recover
2552     2   C_SYNC_SOURCE set BitMap
2553     1   C_SYNC_SOURCE use BitMap
2554     0   no Sync
2555    -1   C_SYNC_TARGET use BitMap
2556    -2   C_SYNC_TARGET set BitMap
2557  -100   after split brain, disconnect
2558 -1000   unrelated data
2559 -1091   requires proto 91
2560 -1096   requires proto 96
2561  */
2562 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2563 {
2564         u64 self, peer;
2565         int i, j;
2566
2567         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2568         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2569
2570         *rule_nr = 10;
2571         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2572                 return 0;
2573
2574         *rule_nr = 20;
2575         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2576              peer != UUID_JUST_CREATED)
2577                 return -2;
2578
2579         *rule_nr = 30;
2580         if (self != UUID_JUST_CREATED &&
2581             (peer == UUID_JUST_CREATED || peer == (u64)0))
2582                 return 2;
2583
2584         if (self == peer) {
2585                 int rct, dc; /* roles at crash time */
2586
2587                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2588
2589                         if (mdev->tconn->agreed_pro_version < 91)
2590                                 return -1091;
2591
2592                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2593                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2594                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2595                                 drbd_uuid_set_bm(mdev, 0UL);
2596
2597                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2598                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2599                                 *rule_nr = 34;
2600                         } else {
2601                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2602                                 *rule_nr = 36;
2603                         }
2604
2605                         return 1;
2606                 }
2607
2608                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2609
2610                         if (mdev->tconn->agreed_pro_version < 91)
2611                                 return -1091;
2612
2613                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2614                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2615                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2616
2617                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2618                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2619                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2620
2621                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2622                                 *rule_nr = 35;
2623                         } else {
2624                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2625                                 *rule_nr = 37;
2626                         }
2627
2628                         return -1;
2629                 }
2630
2631                 /* Common power [off|failure] */
2632                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2633                         (mdev->p_uuid[UI_FLAGS] & 2);
2634                 /* lowest bit is set when we were primary,
2635                  * next bit (weight 2) is set when peer was primary */
2636                 *rule_nr = 40;
2637
2638                 switch (rct) {
2639                 case 0: /* !self_pri && !peer_pri */ return 0;
2640                 case 1: /*  self_pri && !peer_pri */ return 1;
2641                 case 2: /* !self_pri &&  peer_pri */ return -1;
2642                 case 3: /*  self_pri &&  peer_pri */
2643                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2644                         return dc ? -1 : 1;
2645                 }
2646         }
2647
2648         *rule_nr = 50;
2649         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2650         if (self == peer)
2651                 return -1;
2652
2653         *rule_nr = 51;
2654         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2655         if (self == peer) {
2656                 if (mdev->tconn->agreed_pro_version < 96 ?
2657                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2658                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2659                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2660                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2661                            resync as sync source modifications of the peer's UUIDs. */
2662
2663                         if (mdev->tconn->agreed_pro_version < 91)
2664                                 return -1091;
2665
2666                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2667                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2668
2669                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2670                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2671
2672                         return -1;
2673                 }
2674         }
2675
2676         *rule_nr = 60;
2677         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2678         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2679                 peer = mdev->p_uuid[i] & ~((u64)1);
2680                 if (self == peer)
2681                         return -2;
2682         }
2683
2684         *rule_nr = 70;
2685         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2686         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2687         if (self == peer)
2688                 return 1;
2689
2690         *rule_nr = 71;
2691         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2692         if (self == peer) {
2693                 if (mdev->tconn->agreed_pro_version < 96 ?
2694                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2695                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2696                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2697                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2698                            resync as sync source modifications of our UUIDs. */
2699
2700                         if (mdev->tconn->agreed_pro_version < 91)
2701                                 return -1091;
2702
2703                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2704                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2705
2706                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2707                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2708                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2709
2710                         return 1;
2711                 }
2712         }
2713
2714
2715         *rule_nr = 80;
2716         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2717         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2718                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2719                 if (self == peer)
2720                         return 2;
2721         }
2722
2723         *rule_nr = 90;
2724         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2725         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2726         if (self == peer && self != ((u64)0))
2727                 return 100;
2728
2729         *rule_nr = 100;
2730         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2731                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2732                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2733                         peer = mdev->p_uuid[j] & ~((u64)1);
2734                         if (self == peer)
2735                                 return -100;
2736                 }
2737         }
2738
2739         return -1000;
2740 }
2741
2742 /* drbd_sync_handshake() returns the new conn state on success, or
2743    CONN_MASK (-1) on failure.
2744  */
2745 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2746                                            enum drbd_disk_state peer_disk) __must_hold(local)
2747 {
2748         int hg, rule_nr;
2749         enum drbd_conns rv = C_MASK;
2750         enum drbd_disk_state mydisk;
2751
2752         mydisk = mdev->state.disk;
2753         if (mydisk == D_NEGOTIATING)
2754                 mydisk = mdev->new_state_tmp.disk;
2755
2756         dev_info(DEV, "drbd_sync_handshake:\n");
2757         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2758         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2759                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2760
2761         hg = drbd_uuid_compare(mdev, &rule_nr);
2762
2763         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2764
2765         if (hg == -1000) {
2766                 dev_alert(DEV, "Unrelated data, aborting!\n");
2767                 return C_MASK;
2768         }
2769         if (hg < -1000) {
2770                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2771                 return C_MASK;
2772         }
2773
2774         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2775             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2776                 int f = (hg == -100) || abs(hg) == 2;
2777                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2778                 if (f)
2779                         hg = hg*2;
2780                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2781                      hg > 0 ? "source" : "target");
2782         }
2783
2784         if (abs(hg) == 100)
2785                 drbd_khelper(mdev, "initial-split-brain");
2786
2787         if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
2788                 int pcount = (mdev->state.role == R_PRIMARY)
2789                            + (peer_role == R_PRIMARY);
2790                 int forced = (hg == -100);
2791
2792                 switch (pcount) {
2793                 case 0:
2794                         hg = drbd_asb_recover_0p(mdev);
2795                         break;
2796                 case 1:
2797                         hg = drbd_asb_recover_1p(mdev);
2798                         break;
2799                 case 2:
2800                         hg = drbd_asb_recover_2p(mdev);
2801                         break;
2802                 }
2803                 if (abs(hg) < 100) {
2804                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2805                              "automatically solved. Sync from %s node\n",
2806                              pcount, (hg < 0) ? "peer" : "this");
2807                         if (forced) {
2808                                 dev_warn(DEV, "Doing a full sync, since"
2809                                      " UUIDs where ambiguous.\n");
2810                                 hg = hg*2;
2811                         }
2812                 }
2813         }
2814
2815         if (hg == -100) {
2816                 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2817                         hg = -1;
2818                 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2819                         hg = 1;
2820
2821                 if (abs(hg) < 100)
2822                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2823                              "Sync from %s node\n",
2824                              (hg < 0) ? "peer" : "this");
2825         }
2826
2827         if (hg == -100) {
2828                 /* FIXME this log message is not correct if we end up here
2829                  * after an attempted attach on a diskless node.
2830                  * We just refuse to attach -- well, we drop the "connection"
2831                  * to that disk, in a way... */
2832                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2833                 drbd_khelper(mdev, "split-brain");
2834                 return C_MASK;
2835         }
2836
2837         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2838                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2839                 return C_MASK;
2840         }
2841
2842         if (hg < 0 && /* by intention we do not use mydisk here. */
2843             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2844                 switch (mdev->tconn->net_conf->rr_conflict) {
2845                 case ASB_CALL_HELPER:
2846                         drbd_khelper(mdev, "pri-lost");
2847                         /* fall through */
2848                 case ASB_DISCONNECT:
2849                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2850                         return C_MASK;
2851                 case ASB_VIOLENTLY:
2852                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2853                              "assumption\n");
2854                 }
2855         }
2856
2857         if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2858                 if (hg == 0)
2859                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2860                 else
2861                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2862                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2863                                  abs(hg) >= 2 ? "full" : "bit-map based");
2864                 return C_MASK;
2865         }
2866
2867         if (abs(hg) >= 2) {
2868                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2869                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2870                                         BM_LOCKED_SET_ALLOWED))
2871                         return C_MASK;
2872         }
2873
2874         if (hg > 0) { /* become sync source. */
2875                 rv = C_WF_BITMAP_S;
2876         } else if (hg < 0) { /* become sync target */
2877                 rv = C_WF_BITMAP_T;
2878         } else {
2879                 rv = C_CONNECTED;
2880                 if (drbd_bm_total_weight(mdev)) {
2881                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2882                              drbd_bm_total_weight(mdev));
2883                 }
2884         }
2885
2886         return rv;
2887 }
2888
2889 /* returns 1 if invalid */
2890 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2891 {
2892         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2893         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2894             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2895                 return 0;
2896
2897         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2898         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2899             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2900                 return 1;
2901
2902         /* everything else is valid if they are equal on both sides. */
2903         if (peer == self)
2904                 return 0;
2905
2906         /* everything es is invalid. */
2907         return 1;
2908 }
2909
2910 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2911 {
2912         struct p_protocol *p = pi->data;
2913         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2914         int p_want_lose, p_two_primaries, cf;
2915         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2916
2917         p_proto         = be32_to_cpu(p->protocol);
2918         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2919         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2920         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2921         p_two_primaries = be32_to_cpu(p->two_primaries);
2922         cf              = be32_to_cpu(p->conn_flags);
2923         p_want_lose = cf & CF_WANT_LOSE;
2924
2925         clear_bit(CONN_DRY_RUN, &tconn->flags);
2926
2927         if (cf & CF_DRY_RUN)
2928                 set_bit(CONN_DRY_RUN, &tconn->flags);
2929
2930         if (p_proto != tconn->net_conf->wire_protocol) {
2931                 conn_err(tconn, "incompatible communication protocols\n");
2932                 goto disconnect;
2933         }
2934
2935         if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2936                 conn_err(tconn, "incompatible after-sb-0pri settings\n");
2937                 goto disconnect;
2938         }
2939
2940         if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2941                 conn_err(tconn, "incompatible after-sb-1pri settings\n");
2942                 goto disconnect;
2943         }
2944
2945         if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2946                 conn_err(tconn, "incompatible after-sb-2pri settings\n");
2947                 goto disconnect;
2948         }
2949
2950         if (p_want_lose && tconn->net_conf->want_lose) {
2951                 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
2952                 goto disconnect;
2953         }
2954
2955         if (p_two_primaries != tconn->net_conf->two_primaries) {
2956                 conn_err(tconn, "incompatible setting of the two-primaries options\n");
2957                 goto disconnect;
2958         }
2959
2960         if (tconn->agreed_pro_version >= 87) {
2961                 unsigned char *my_alg = tconn->net_conf->integrity_alg;
2962                 int err;
2963
2964                 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
2965                 if (err)
2966                         return err;
2967
2968                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2969                 if (strcmp(p_integrity_alg, my_alg)) {
2970                         conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
2971                         goto disconnect;
2972                 }
2973                 conn_info(tconn, "data-integrity-alg: %s\n",
2974                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2975         }
2976
2977         return 0;
2978
2979 disconnect:
2980         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
2981         return -EIO;
2982 }
2983
2984 /* helper function
2985  * input: alg name, feature name
2986  * return: NULL (alg name was "")
2987  *         ERR_PTR(error) if something goes wrong
2988  *         or the crypto hash ptr, if it worked out ok. */
2989 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2990                 const char *alg, const char *name)
2991 {
2992         struct crypto_hash *tfm;
2993
2994         if (!alg[0])
2995                 return NULL;
2996
2997         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2998         if (IS_ERR(tfm)) {
2999                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3000                         alg, name, PTR_ERR(tfm));
3001                 return tfm;
3002         }
3003         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
3004                 crypto_free_hash(tfm);
3005                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
3006                 return ERR_PTR(-EINVAL);
3007         }
3008         return tfm;
3009 }
3010
3011 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3012 {
3013         void *buffer = tconn->data.rbuf;
3014         int size = pi->size;
3015
3016         while (size) {
3017                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3018                 s = drbd_recv(tconn, buffer, s);
3019                 if (s <= 0) {
3020                         if (s < 0)
3021                                 return s;
3022                         break;
3023                 }
3024                 size -= s;
3025         }
3026         if (size)
3027                 return -EIO;
3028         return 0;
3029 }
3030
3031 /*
3032  * config_unknown_volume  -  device configuration command for unknown volume
3033  *
3034  * When a device is added to an existing connection, the node on which the
3035  * device is added first will send configuration commands to its peer but the
3036  * peer will not know about the device yet.  It will warn and ignore these
3037  * commands.  Once the device is added on the second node, the second node will
3038  * send the same device configuration commands, but in the other direction.
3039  *
3040  * (We can also end up here if drbd is misconfigured.)
3041  */
3042 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3043 {
3044         conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3045                   pi->vnr, cmdname(pi->cmd));
3046         return ignore_remaining_packet(tconn, pi);
3047 }
3048
3049 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3050 {
3051         struct drbd_conf *mdev;
3052         struct p_rs_param_95 *p;
3053         unsigned int header_size, data_size, exp_max_sz;
3054         struct crypto_hash *verify_tfm = NULL;
3055         struct crypto_hash *csums_tfm = NULL;
3056         const int apv = tconn->agreed_pro_version;
3057         int *rs_plan_s = NULL;
3058         int fifo_size = 0;
3059         int err;
3060
3061         mdev = vnr_to_mdev(tconn, pi->vnr);
3062         if (!mdev)
3063                 return config_unknown_volume(tconn, pi);
3064
3065         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3066                     : apv == 88 ? sizeof(struct p_rs_param)
3067                                         + SHARED_SECRET_MAX
3068                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3069                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3070
3071         if (pi->size > exp_max_sz) {
3072                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3073                     pi->size, exp_max_sz);
3074                 return -EIO;
3075         }
3076
3077         if (apv <= 88) {
3078                 header_size = sizeof(struct p_rs_param);
3079                 data_size = pi->size - header_size;
3080         } else if (apv <= 94) {
3081                 header_size = sizeof(struct p_rs_param_89);
3082                 data_size = pi->size - header_size;
3083                 D_ASSERT(data_size == 0);
3084         } else {
3085                 header_size = sizeof(struct p_rs_param_95);
3086                 data_size = pi->size - header_size;
3087                 D_ASSERT(data_size == 0);
3088         }
3089
3090         /* initialize verify_alg and csums_alg */
3091         p = pi->data;
3092         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3093
3094         err = drbd_recv_all(mdev->tconn, p, header_size);
3095         if (err)
3096                 return err;
3097
3098         if (get_ldev(mdev)) {
3099                 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3100                 put_ldev(mdev);
3101         }
3102
3103         if (apv >= 88) {
3104                 if (apv == 88) {
3105                         if (data_size > SHARED_SECRET_MAX) {
3106                                 dev_err(DEV, "verify-alg too long, "
3107                                     "peer wants %u, accepting only %u byte\n",
3108                                                 data_size, SHARED_SECRET_MAX);
3109                                 return -EIO;
3110                         }
3111
3112                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3113                         if (err)
3114                                 return err;
3115
3116                         /* we expect NUL terminated string */
3117                         /* but just in case someone tries to be evil */
3118                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3119                         p->verify_alg[data_size-1] = 0;
3120
3121                 } else /* apv >= 89 */ {
3122                         /* we still expect NUL terminated strings */
3123                         /* but just in case someone tries to be evil */
3124                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3125                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3126                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3127                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3128                 }
3129
3130                 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
3131                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3132                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3133                                     mdev->tconn->net_conf->verify_alg, p->verify_alg);
3134                                 goto disconnect;
3135                         }
3136                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3137                                         p->verify_alg, "verify-alg");
3138                         if (IS_ERR(verify_tfm)) {
3139                                 verify_tfm = NULL;
3140                                 goto disconnect;
3141                         }
3142                 }
3143
3144                 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
3145                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3146                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3147                                     mdev->tconn->net_conf->csums_alg, p->csums_alg);
3148                                 goto disconnect;
3149                         }
3150                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3151                                         p->csums_alg, "csums-alg");
3152                         if (IS_ERR(csums_tfm)) {
3153                                 csums_tfm = NULL;
3154                                 goto disconnect;
3155                         }
3156                 }
3157
3158                 if (apv > 94 && get_ldev(mdev)) {
3159                         mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3160                         mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3161                         mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3162                         mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3163                         mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3164
3165                         fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3166                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3167                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3168                                 if (!rs_plan_s) {
3169                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3170                                         put_ldev(mdev);
3171                                         goto disconnect;
3172                                 }
3173                         }
3174                         put_ldev(mdev);
3175                 }
3176
3177                 spin_lock(&mdev->peer_seq_lock);
3178                 /* lock against drbd_nl_syncer_conf() */
3179                 if (verify_tfm) {
3180                         strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3181                         mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3182                         crypto_free_hash(mdev->tconn->verify_tfm);
3183                         mdev->tconn->verify_tfm = verify_tfm;
3184                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3185                 }
3186                 if (csums_tfm) {
3187                         strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3188                         mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3189                         crypto_free_hash(mdev->tconn->csums_tfm);
3190                         mdev->tconn->csums_tfm = csums_tfm;
3191                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3192                 }
3193                 if (fifo_size != mdev->rs_plan_s.size) {
3194                         kfree(mdev->rs_plan_s.values);
3195                         mdev->rs_plan_s.values = rs_plan_s;
3196                         mdev->rs_plan_s.size   = fifo_size;
3197                         mdev->rs_planed = 0;
3198                 }
3199                 spin_unlock(&mdev->peer_seq_lock);
3200         }
3201         return 0;
3202
3203 disconnect:
3204         /* just for completeness: actually not needed,
3205          * as this is not reached if csums_tfm was ok. */
3206         crypto_free_hash(csums_tfm);
3207         /* but free the verify_tfm again, if csums_tfm did not work out */
3208         crypto_free_hash(verify_tfm);
3209         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3210         return -EIO;
3211 }
3212
3213 /* warn if the arguments differ by more than 12.5% */
3214 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3215         const char *s, sector_t a, sector_t b)
3216 {
3217         sector_t d;
3218         if (a == 0 || b == 0)
3219                 return;
3220         d = (a > b) ? (a - b) : (b - a);
3221         if (d > (a>>3) || d > (b>>3))
3222                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3223                      (unsigned long long)a, (unsigned long long)b);
3224 }
3225
3226 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3227 {
3228         struct drbd_conf *mdev;
3229         struct p_sizes *p = pi->data;
3230         enum determine_dev_size dd = unchanged;
3231         sector_t p_size, p_usize, my_usize;
3232         int ldsc = 0; /* local disk size changed */
3233         enum dds_flags ddsf;
3234
3235         mdev = vnr_to_mdev(tconn, pi->vnr);
3236         if (!mdev)
3237                 return config_unknown_volume(tconn, pi);
3238
3239         p_size = be64_to_cpu(p->d_size);
3240         p_usize = be64_to_cpu(p->u_size);
3241
3242         /* just store the peer's disk size for now.
3243          * we still need to figure out whether we accept that. */
3244         mdev->p_size = p_size;
3245
3246         if (get_ldev(mdev)) {
3247                 warn_if_differ_considerably(mdev, "lower level device sizes",
3248                            p_size, drbd_get_max_capacity(mdev->ldev));
3249                 warn_if_differ_considerably(mdev, "user requested size",
3250                                             p_usize, mdev->ldev->dc.disk_size);
3251
3252                 /* if this is the first connect, or an otherwise expected
3253                  * param exchange, choose the minimum */
3254                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3255                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3256                                              p_usize);
3257
3258                 my_usize = mdev->ldev->dc.disk_size;
3259
3260                 if (mdev->ldev->dc.disk_size != p_usize) {
3261                         mdev->ldev->dc.disk_size = p_usize;
3262                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3263                              (unsigned long)mdev->ldev->dc.disk_size);
3264                 }
3265
3266                 /* Never shrink a device with usable data during connect.
3267                    But allow online shrinking if we are connected. */
3268                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3269                    drbd_get_capacity(mdev->this_bdev) &&
3270                    mdev->state.disk >= D_OUTDATED &&
3271                    mdev->state.conn < C_CONNECTED) {
3272                         dev_err(DEV, "The peer's disk size is too small!\n");
3273                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3274                         mdev->ldev->dc.disk_size = my_usize;
3275                         put_ldev(mdev);
3276                         return -EIO;
3277                 }
3278                 put_ldev(mdev);
3279         }
3280
3281         ddsf = be16_to_cpu(p->dds_flags);
3282         if (get_ldev(mdev)) {
3283                 dd = drbd_determine_dev_size(mdev, ddsf);
3284                 put_ldev(mdev);
3285                 if (dd == dev_size_error)
3286                         return -EIO;
3287                 drbd_md_sync(mdev);
3288         } else {
3289                 /* I am diskless, need to accept the peer's size. */
3290                 drbd_set_my_capacity(mdev, p_size);
3291         }
3292
3293         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3294         drbd_reconsider_max_bio_size(mdev);
3295
3296         if (get_ldev(mdev)) {
3297                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3298                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3299                         ldsc = 1;
3300                 }
3301
3302                 put_ldev(mdev);
3303         }
3304
3305         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3306                 if (be64_to_cpu(p->c_size) !=
3307                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3308                         /* we have different sizes, probably peer
3309                          * needs to know my new size... */
3310                         drbd_send_sizes(mdev, 0, ddsf);
3311                 }
3312                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3313                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3314                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3315                             mdev->state.disk >= D_INCONSISTENT) {
3316                                 if (ddsf & DDSF_NO_RESYNC)
3317                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3318                                 else
3319                                         resync_after_online_grow(mdev);
3320                         } else
3321                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3322                 }
3323         }
3324
3325         return 0;
3326 }
3327
3328 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3329 {
3330         struct drbd_conf *mdev;
3331         struct p_uuids *p = pi->data;
3332         u64 *p_uuid;
3333         int i, updated_uuids = 0;
3334
3335         mdev = vnr_to_mdev(tconn, pi->vnr);
3336         if (!mdev)
3337                 return config_unknown_volume(tconn, pi);
3338
3339         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3340
3341         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3342                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3343
3344         kfree(mdev->p_uuid);
3345         mdev->p_uuid = p_uuid;
3346
3347         if (mdev->state.conn < C_CONNECTED &&
3348             mdev->state.disk < D_INCONSISTENT &&
3349             mdev->state.role == R_PRIMARY &&
3350             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3351                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3352                     (unsigned long long)mdev->ed_uuid);
3353                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3354                 return -EIO;
3355         }
3356
3357         if (get_ldev(mdev)) {
3358                 int skip_initial_sync =
3359                         mdev->state.conn == C_CONNECTED &&
3360                         mdev->tconn->agreed_pro_version >= 90 &&
3361                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3362                         (p_uuid[UI_FLAGS] & 8);
3363                 if (skip_initial_sync) {
3364                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3365                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3366                                         "clear_n_write from receive_uuids",
3367                                         BM_LOCKED_TEST_ALLOWED);
3368                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3369                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3370                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3371                                         CS_VERBOSE, NULL);
3372                         drbd_md_sync(mdev);
3373                         updated_uuids = 1;
3374                 }
3375                 put_ldev(mdev);
3376         } else if (mdev->state.disk < D_INCONSISTENT &&
3377                    mdev->state.role == R_PRIMARY) {
3378                 /* I am a diskless primary, the peer just created a new current UUID
3379                    for me. */
3380                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3381         }
3382
3383         /* Before we test for the disk state, we should wait until an eventually
3384            ongoing cluster wide state change is finished. That is important if
3385            we are primary and are detaching from our disk. We need to see the
3386            new disk state... */
3387         mutex_lock(mdev->state_mutex);
3388         mutex_unlock(mdev->state_mutex);
3389         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3390                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3391
3392         if (updated_uuids)
3393                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3394
3395         return 0;
3396 }
3397
3398 /**
3399  * convert_state() - Converts the peer's view of the cluster state to our point of view
3400  * @ps:         The state as seen by the peer.
3401  */
3402 static union drbd_state convert_state(union drbd_state ps)
3403 {
3404         union drbd_state ms;
3405
3406         static enum drbd_conns c_tab[] = {
3407                 [C_CONNECTED] = C_CONNECTED,
3408
3409                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3410                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3411                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3412                 [C_VERIFY_S]       = C_VERIFY_T,
3413                 [C_MASK]   = C_MASK,
3414         };
3415
3416         ms.i = ps.i;
3417
3418         ms.conn = c_tab[ps.conn];
3419         ms.peer = ps.role;
3420         ms.role = ps.peer;
3421         ms.pdsk = ps.disk;
3422         ms.disk = ps.pdsk;
3423         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3424
3425         return ms;
3426 }
3427
3428 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3429 {
3430         struct drbd_conf *mdev;
3431         struct p_req_state *p = pi->data;
3432         union drbd_state mask, val;
3433         enum drbd_state_rv rv;
3434
3435         mdev = vnr_to_mdev(tconn, pi->vnr);
3436         if (!mdev)
3437                 return -EIO;
3438
3439         mask.i = be32_to_cpu(p->mask);
3440         val.i = be32_to_cpu(p->val);
3441
3442         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3443             mutex_is_locked(mdev->state_mutex)) {
3444                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3445                 return 0;
3446         }
3447
3448         mask = convert_state(mask);
3449         val = convert_state(val);
3450
3451         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3452         drbd_send_sr_reply(mdev, rv);
3453
3454         drbd_md_sync(mdev);
3455
3456         return 0;
3457 }
3458
3459 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3460 {
3461         struct p_req_state *p = pi->data;
3462         union drbd_state mask, val;
3463         enum drbd_state_rv rv;
3464
3465         mask.i = be32_to_cpu(p->mask);
3466         val.i = be32_to_cpu(p->val);
3467
3468         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3469             mutex_is_locked(&tconn->cstate_mutex)) {
3470                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3471                 return 0;
3472         }
3473
3474         mask = convert_state(mask);
3475         val = convert_state(val);
3476
3477         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3478         conn_send_sr_reply(tconn, rv);
3479
3480         return 0;
3481 }
3482
3483 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3484 {
3485         struct drbd_conf *mdev;
3486         struct p_state *p = pi->data;
3487         union drbd_state os, ns, peer_state;
3488         enum drbd_disk_state real_peer_disk;
3489         enum chg_state_flags cs_flags;
3490         int rv;
3491
3492         mdev = vnr_to_mdev(tconn, pi->vnr);
3493         if (!mdev)
3494                 return config_unknown_volume(tconn, pi);
3495
3496         peer_state.i = be32_to_cpu(p->state);
3497
3498         real_peer_disk = peer_state.disk;
3499         if (peer_state.disk == D_NEGOTIATING) {
3500                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3501                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3502         }
3503
3504         spin_lock_irq(&mdev->tconn->req_lock);
3505  retry:
3506         os = ns = drbd_read_state(mdev);
3507         spin_unlock_irq(&mdev->tconn->req_lock);
3508
3509         /* peer says his disk is uptodate, while we think it is inconsistent,
3510          * and this happens while we think we have a sync going on. */
3511         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3512             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3513                 /* If we are (becoming) SyncSource, but peer is still in sync
3514                  * preparation, ignore its uptodate-ness to avoid flapping, it
3515                  * will change to inconsistent once the peer reaches active
3516                  * syncing states.
3517                  * It may have changed syncer-paused flags, however, so we
3518                  * cannot ignore this completely. */
3519                 if (peer_state.conn > C_CONNECTED &&
3520                     peer_state.conn < C_SYNC_SOURCE)
3521                         real_peer_disk = D_INCONSISTENT;
3522
3523                 /* if peer_state changes to connected at the same time,
3524                  * it explicitly notifies us that it finished resync.
3525                  * Maybe we should finish it up, too? */
3526                 else if (os.conn >= C_SYNC_SOURCE &&
3527                          peer_state.conn == C_CONNECTED) {
3528                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3529                                 drbd_resync_finished(mdev);
3530                         return 0;
3531                 }
3532         }
3533
3534         /* peer says his disk is inconsistent, while we think it is uptodate,
3535          * and this happens while the peer still thinks we have a sync going on,
3536          * but we think we are already done with the sync.
3537          * We ignore this to avoid flapping pdsk.
3538          * This should not happen, if the peer is a recent version of drbd. */
3539         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3540             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3541                 real_peer_disk = D_UP_TO_DATE;
3542
3543         if (ns.conn == C_WF_REPORT_PARAMS)
3544                 ns.conn = C_CONNECTED;
3545
3546         if (peer_state.conn == C_AHEAD)
3547                 ns.conn = C_BEHIND;
3548
3549         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3550             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3551                 int cr; /* consider resync */
3552
3553                 /* if we established a new connection */
3554                 cr  = (os.conn < C_CONNECTED);
3555                 /* if we had an established connection
3556                  * and one of the nodes newly attaches a disk */
3557                 cr |= (os.conn == C_CONNECTED &&
3558                        (peer_state.disk == D_NEGOTIATING ||
3559                         os.disk == D_NEGOTIATING));
3560                 /* if we have both been inconsistent, and the peer has been
3561                  * forced to be UpToDate with --overwrite-data */
3562                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3563                 /* if we had been plain connected, and the admin requested to
3564                  * start a sync by "invalidate" or "invalidate-remote" */
3565                 cr |= (os.conn == C_CONNECTED &&
3566                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3567                                  peer_state.conn <= C_WF_BITMAP_T));
3568
3569                 if (cr)
3570                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3571
3572                 put_ldev(mdev);
3573                 if (ns.conn == C_MASK) {
3574                         ns.conn = C_CONNECTED;
3575                         if (mdev->state.disk == D_NEGOTIATING) {
3576                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3577                         } else if (peer_state.disk == D_NEGOTIATING) {
3578                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3579                                 peer_state.disk = D_DISKLESS;
3580                                 real_peer_disk = D_DISKLESS;
3581                         } else {
3582                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3583                                         return -EIO;
3584                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3585                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3586                                 return -EIO;
3587                         }
3588                 }
3589         }
3590
3591         spin_lock_irq(&mdev->tconn->req_lock);
3592         if (os.i != drbd_read_state(mdev).i)
3593                 goto retry;
3594         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3595         ns.peer = peer_state.role;
3596         ns.pdsk = real_peer_disk;
3597         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3598         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3599                 ns.disk = mdev->new_state_tmp.disk;
3600         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3601         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3602             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3603                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3604                    for temporal network outages! */
3605                 spin_unlock_irq(&mdev->tconn->req_lock);
3606                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3607                 tl_clear(mdev->tconn);
3608                 drbd_uuid_new_current(mdev);
3609                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3610                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3611                 return -EIO;
3612         }
3613         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3614         ns = drbd_read_state(mdev);
3615         spin_unlock_irq(&mdev->tconn->req_lock);
3616
3617         if (rv < SS_SUCCESS) {
3618                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3619                 return -EIO;
3620         }
3621
3622         if (os.conn > C_WF_REPORT_PARAMS) {
3623                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3624                     peer_state.disk != D_NEGOTIATING ) {
3625                         /* we want resync, peer has not yet decided to sync... */
3626                         /* Nowadays only used when forcing a node into primary role and
3627                            setting its disk to UpToDate with that */
3628                         drbd_send_uuids(mdev);
3629                         drbd_send_state(mdev);
3630                 }
3631         }
3632
3633         mdev->tconn->net_conf->want_lose = 0;
3634
3635         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3636
3637         return 0;
3638 }
3639
3640 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3641 {
3642         struct drbd_conf *mdev;
3643         struct p_rs_uuid *p = pi->data;
3644
3645         mdev = vnr_to_mdev(tconn, pi->vnr);
3646         if (!mdev)
3647                 return -EIO;
3648
3649         wait_event(mdev->misc_wait,
3650                    mdev->state.conn == C_WF_SYNC_UUID ||
3651                    mdev->state.conn == C_BEHIND ||
3652                    mdev->state.conn < C_CONNECTED ||
3653                    mdev->state.disk < D_NEGOTIATING);
3654
3655         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3656
3657         /* Here the _drbd_uuid_ functions are right, current should
3658            _not_ be rotated into the history */
3659         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3660                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3661                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3662
3663                 drbd_print_uuids(mdev, "updated sync uuid");
3664                 drbd_start_resync(mdev, C_SYNC_TARGET);
3665
3666                 put_ldev(mdev);
3667         } else
3668                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3669
3670         return 0;
3671 }
3672
3673 /**
3674  * receive_bitmap_plain
3675  *
3676  * Return 0 when done, 1 when another iteration is needed, and a negative error
3677  * code upon failure.
3678  */
3679 static int
3680 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3681                      unsigned long *p, struct bm_xfer_ctx *c)
3682 {
3683         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3684                                  drbd_header_size(mdev->tconn);
3685         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3686                                        c->bm_words - c->word_offset);
3687         unsigned int want = num_words * sizeof(*p);
3688         int err;
3689
3690         if (want != size) {
3691                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3692                 return -EIO;
3693         }
3694         if (want == 0)
3695                 return 0;
3696         err = drbd_recv_all(mdev->tconn, p, want);
3697         if (err)
3698                 return err;
3699
3700         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3701
3702         c->word_offset += num_words;
3703         c->bit_offset = c->word_offset * BITS_PER_LONG;
3704         if (c->bit_offset > c->bm_bits)
3705                 c->bit_offset = c->bm_bits;
3706
3707         return 1;
3708 }
3709
3710 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3711 {
3712         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3713 }
3714
3715 static int dcbp_get_start(struct p_compressed_bm *p)
3716 {
3717         return (p->encoding & 0x80) != 0;
3718 }
3719
3720 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3721 {
3722         return (p->encoding >> 4) & 0x7;
3723 }
3724
3725 /**
3726  * recv_bm_rle_bits
3727  *
3728  * Return 0 when done, 1 when another iteration is needed, and a negative error
3729  * code upon failure.
3730  */
3731 static int
3732 recv_bm_rle_bits(struct drbd_conf *mdev,
3733                 struct p_compressed_bm *p,
3734                  struct bm_xfer_ctx *c,
3735                  unsigned int len)
3736 {
3737         struct bitstream bs;
3738         u64 look_ahead;
3739         u64 rl;
3740         u64 tmp;
3741         unsigned long s = c->bit_offset;
3742         unsigned long e;
3743         int toggle = dcbp_get_start(p);
3744         int have;
3745         int bits;
3746
3747         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3748
3749         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3750         if (bits < 0)
3751                 return -EIO;
3752
3753         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3754                 bits = vli_decode_bits(&rl, look_ahead);
3755                 if (bits <= 0)
3756                         return -EIO;
3757
3758                 if (toggle) {
3759                         e = s + rl -1;
3760                         if (e >= c->bm_bits) {
3761                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3762                                 return -EIO;
3763                         }
3764                         _drbd_bm_set_bits(mdev, s, e);
3765                 }
3766
3767                 if (have < bits) {
3768                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3769                                 have, bits, look_ahead,
3770                                 (unsigned int)(bs.cur.b - p->code),
3771                                 (unsigned int)bs.buf_len);
3772                         return -EIO;
3773                 }
3774                 look_ahead >>= bits;
3775                 have -= bits;
3776
3777                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3778                 if (bits < 0)
3779                         return -EIO;
3780                 look_ahead |= tmp << have;
3781                 have += bits;
3782         }
3783
3784         c->bit_offset = s;
3785         bm_xfer_ctx_bit_to_word_offset(c);
3786
3787         return (s != c->bm_bits);
3788 }
3789
3790 /**
3791  * decode_bitmap_c
3792  *
3793  * Return 0 when done, 1 when another iteration is needed, and a negative error
3794  * code upon failure.
3795  */
3796 static int
3797 decode_bitmap_c(struct drbd_conf *mdev,
3798                 struct p_compressed_bm *p,
3799                 struct bm_xfer_ctx *c,
3800                 unsigned int len)
3801 {
3802         if (dcbp_get_code(p) == RLE_VLI_Bits)
3803                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
3804
3805         /* other variants had been implemented for evaluation,
3806          * but have been dropped as this one turned out to be "best"
3807          * during all our tests. */
3808
3809         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3810         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3811         return -EIO;
3812 }
3813
3814 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3815                 const char *direction, struct bm_xfer_ctx *c)
3816 {
3817         /* what would it take to transfer it "plaintext" */
3818         unsigned int header_size = drbd_header_size(mdev->tconn);
3819         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3820         unsigned int plain =
3821                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3822                 c->bm_words * sizeof(unsigned long);
3823         unsigned int total = c->bytes[0] + c->bytes[1];
3824         unsigned int r;
3825
3826         /* total can not be zero. but just in case: */
3827         if (total == 0)
3828                 return;
3829
3830         /* don't report if not compressed */
3831         if (total >= plain)
3832                 return;
3833
3834         /* total < plain. check for overflow, still */
3835         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3836                                     : (1000 * total / plain);
3837
3838         if (r > 1000)
3839                 r = 1000;
3840
3841         r = 1000 - r;
3842         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3843              "total %u; compression: %u.%u%%\n",
3844                         direction,
3845                         c->bytes[1], c->packets[1],
3846                         c->bytes[0], c->packets[0],
3847                         total, r/10, r % 10);
3848 }
3849
3850 /* Since we are processing the bitfield from lower addresses to higher,
3851    it does not matter if the process it in 32 bit chunks or 64 bit
3852    chunks as long as it is little endian. (Understand it as byte stream,
3853    beginning with the lowest byte...) If we would use big endian
3854    we would need to process it from the highest address to the lowest,
3855    in order to be agnostic to the 32 vs 64 bits issue.
3856
3857    returns 0 on failure, 1 if we successfully received it. */
3858 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3859 {
3860         struct drbd_conf *mdev;
3861         struct bm_xfer_ctx c;
3862         int err;
3863
3864         mdev = vnr_to_mdev(tconn, pi->vnr);
3865         if (!mdev)
3866                 return -EIO;
3867
3868         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3869         /* you are supposed to send additional out-of-sync information
3870          * if you actually set bits during this phase */
3871
3872         c = (struct bm_xfer_ctx) {
3873                 .bm_bits = drbd_bm_bits(mdev),
3874                 .bm_words = drbd_bm_words(mdev),
3875         };
3876
3877         for(;;) {
3878                 if (pi->cmd == P_BITMAP)
3879                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
3880                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
3881                         /* MAYBE: sanity check that we speak proto >= 90,
3882                          * and the feature is enabled! */
3883                         struct p_compressed_bm *p = pi->data;
3884
3885                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
3886                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3887                                 err = -EIO;
3888                                 goto out;
3889                         }
3890                         if (pi->size <= sizeof(*p)) {
3891                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
3892                                 err = -EIO;
3893                                 goto out;
3894                         }
3895                         err = drbd_recv_all(mdev->tconn, p, pi->size);
3896                         if (err)
3897                                goto out;
3898                         err = decode_bitmap_c(mdev, p, &c, pi->size);
3899                 } else {
3900                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
3901                         err = -EIO;
3902                         goto out;
3903                 }
3904
3905                 c.packets[pi->cmd == P_BITMAP]++;
3906                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
3907
3908                 if (err <= 0) {
3909                         if (err < 0)
3910                                 goto out;
3911                         break;
3912                 }
3913                 err = drbd_recv_header(mdev->tconn, pi);
3914                 if (err)
3915                         goto out;
3916         }
3917
3918         INFO_bm_xfer_stats(mdev, "receive", &c);
3919
3920         if (mdev->state.conn == C_WF_BITMAP_T) {
3921                 enum drbd_state_rv rv;
3922
3923                 err = drbd_send_bitmap(mdev);
3924                 if (err)
3925                         goto out;
3926                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3927                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3928                 D_ASSERT(rv == SS_SUCCESS);
3929         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3930                 /* admin may have requested C_DISCONNECTING,
3931                  * other threads may have noticed network errors */
3932                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3933                     drbd_conn_str(mdev->state.conn));
3934         }
3935         err = 0;
3936
3937  out:
3938         drbd_bm_unlock(mdev);
3939         if (!err && mdev->state.conn == C_WF_BITMAP_S)
3940                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3941         return err;
3942 }
3943
3944 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
3945 {
3946         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
3947                  pi->cmd, pi->size);
3948
3949         return ignore_remaining_packet(tconn, pi);
3950 }
3951
3952 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
3953 {
3954         /* Make sure we've acked all the TCP data associated
3955          * with the data requests being unplugged */
3956         drbd_tcp_quickack(tconn->data.socket);
3957
3958         return 0;
3959 }
3960
3961 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
3962 {
3963         struct drbd_conf *mdev;
3964         struct p_block_desc *p = pi->data;
3965
3966         mdev = vnr_to_mdev(tconn, pi->vnr);
3967         if (!mdev)
3968                 return -EIO;
3969
3970         switch (mdev->state.conn) {
3971         case C_WF_SYNC_UUID:
3972         case C_WF_BITMAP_T:
3973         case C_BEHIND:
3974                         break;
3975         default:
3976                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3977                                 drbd_conn_str(mdev->state.conn));
3978         }
3979
3980         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3981
3982         return 0;
3983 }
3984
3985 struct data_cmd {
3986         int expect_payload;
3987         size_t pkt_size;
3988         int (*fn)(struct drbd_tconn *, struct packet_info *);
3989 };
3990
3991 static struct data_cmd drbd_cmd_handler[] = {
3992         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
3993         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
3994         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3995         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3996         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
3997         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
3998         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
3999         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4000         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4001         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4002         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4003         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4004         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4005         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4006         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4007         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4008         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4009         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4010         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4011         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4012         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4013         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4014         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4015 };
4016
4017 static void drbdd(struct drbd_tconn *tconn)
4018 {
4019         struct packet_info pi;
4020         size_t shs; /* sub header size */
4021         int err;
4022
4023         while (get_t_state(&tconn->receiver) == RUNNING) {
4024                 struct data_cmd *cmd;
4025
4026                 drbd_thread_current_set_cpu(&tconn->receiver);
4027                 if (drbd_recv_header(tconn, &pi))
4028                         goto err_out;
4029
4030                 cmd = &drbd_cmd_handler[pi.cmd];
4031                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4032                         conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4033                         goto err_out;
4034                 }
4035
4036                 shs = cmd->pkt_size;
4037                 if (pi.size > shs && !cmd->expect_payload) {
4038                         conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4039                         goto err_out;
4040                 }
4041
4042                 if (shs) {
4043                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4044                         if (err)
4045                                 goto err_out;
4046                         pi.size -= shs;
4047                 }
4048
4049                 err = cmd->fn(tconn, &pi);
4050                 if (err) {
4051                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4052                                  cmdname(pi.cmd), err, pi.size);
4053                         goto err_out;
4054                 }
4055         }
4056         return;
4057
4058     err_out:
4059         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4060 }
4061
4062 void conn_flush_workqueue(struct drbd_tconn *tconn)
4063 {
4064         struct drbd_wq_barrier barr;
4065
4066         barr.w.cb = w_prev_work_done;
4067         barr.w.tconn = tconn;
4068         init_completion(&barr.done);
4069         drbd_queue_work(&tconn->data.work, &barr.w);
4070         wait_for_completion(&barr.done);
4071 }
4072
4073 static void drbd_disconnect(struct drbd_tconn *tconn)
4074 {
4075         enum drbd_conns oc;
4076         int rv = SS_UNKNOWN_ERROR;
4077
4078         if (tconn->cstate == C_STANDALONE)
4079                 return;
4080
4081         /* asender does not clean up anything. it must not interfere, either */
4082         drbd_thread_stop(&tconn->asender);
4083         drbd_free_sock(tconn);
4084
4085         idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4086         conn_info(tconn, "Connection closed\n");
4087
4088         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4089                 conn_try_outdate_peer_async(tconn);
4090
4091         spin_lock_irq(&tconn->req_lock);
4092         oc = tconn->cstate;
4093         if (oc >= C_UNCONNECTED)
4094                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4095
4096         spin_unlock_irq(&tconn->req_lock);
4097
4098         if (oc == C_DISCONNECTING) {
4099                 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4100
4101                 crypto_free_hash(tconn->cram_hmac_tfm);
4102                 tconn->cram_hmac_tfm = NULL;
4103
4104                 kfree(tconn->net_conf);
4105                 tconn->net_conf = NULL;
4106                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
4107         }
4108 }
4109
4110 static int drbd_disconnected(int vnr, void *p, void *data)
4111 {
4112         struct drbd_conf *mdev = (struct drbd_conf *)p;
4113         enum drbd_fencing_p fp;
4114         unsigned int i;
4115
4116         /* wait for current activity to cease. */
4117         spin_lock_irq(&mdev->tconn->req_lock);
4118         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4119         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4120         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4121         spin_unlock_irq(&mdev->tconn->req_lock);
4122
4123         /* We do not have data structures that would allow us to
4124          * get the rs_pending_cnt down to 0 again.
4125          *  * On C_SYNC_TARGET we do not have any data structures describing
4126          *    the pending RSDataRequest's we have sent.
4127          *  * On C_SYNC_SOURCE there is no data structure that tracks
4128          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4129          *  And no, it is not the sum of the reference counts in the
4130          *  resync_LRU. The resync_LRU tracks the whole operation including
4131          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4132          *  on the fly. */
4133         drbd_rs_cancel_all(mdev);
4134         mdev->rs_total = 0;
4135         mdev->rs_failed = 0;
4136         atomic_set(&mdev->rs_pending_cnt, 0);
4137         wake_up(&mdev->misc_wait);
4138
4139         del_timer(&mdev->request_timer);
4140
4141         del_timer_sync(&mdev->resync_timer);
4142         resync_timer_fn((unsigned long)mdev);
4143
4144         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4145          * w_make_resync_request etc. which may still be on the worker queue
4146          * to be "canceled" */
4147         drbd_flush_workqueue(mdev);
4148
4149         /* This also does reclaim_net_ee().  If we do this too early, we might
4150          * miss some resync ee and pages.*/
4151         drbd_process_done_ee(mdev);
4152
4153         kfree(mdev->p_uuid);
4154         mdev->p_uuid = NULL;
4155
4156         if (!drbd_suspended(mdev))
4157                 tl_clear(mdev->tconn);
4158
4159         drbd_md_sync(mdev);
4160
4161         fp = FP_DONT_CARE;
4162         if (get_ldev(mdev)) {
4163                 fp = mdev->ldev->dc.fencing;
4164                 put_ldev(mdev);
4165         }
4166
4167         /* serialize with bitmap writeout triggered by the state change,
4168          * if any. */
4169         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4170
4171         /* tcp_close and release of sendpage pages can be deferred.  I don't
4172          * want to use SO_LINGER, because apparently it can be deferred for
4173          * more than 20 seconds (longest time I checked).
4174          *
4175          * Actually we don't care for exactly when the network stack does its
4176          * put_page(), but release our reference on these pages right here.
4177          */
4178         i = drbd_release_ee(mdev, &mdev->net_ee);
4179         if (i)
4180                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4181         i = atomic_read(&mdev->pp_in_use_by_net);
4182         if (i)
4183                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4184         i = atomic_read(&mdev->pp_in_use);
4185         if (i)
4186                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4187
4188         D_ASSERT(list_empty(&mdev->read_ee));
4189         D_ASSERT(list_empty(&mdev->active_ee));
4190         D_ASSERT(list_empty(&mdev->sync_ee));
4191         D_ASSERT(list_empty(&mdev->done_ee));
4192
4193         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4194         atomic_set(&mdev->current_epoch->epoch_size, 0);
4195         D_ASSERT(list_empty(&mdev->current_epoch->list));
4196
4197         return 0;
4198 }
4199
4200 /*
4201  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4202  * we can agree on is stored in agreed_pro_version.
4203  *
4204  * feature flags and the reserved array should be enough room for future
4205  * enhancements of the handshake protocol, and possible plugins...
4206  *
4207  * for now, they are expected to be zero, but ignored.
4208  */
4209 static int drbd_send_features(struct drbd_tconn *tconn)
4210 {
4211         struct drbd_socket *sock;
4212         struct p_connection_features *p;
4213
4214         sock = &tconn->data;
4215         p = conn_prepare_command(tconn, sock);
4216         if (!p)
4217                 return -EIO;
4218         memset(p, 0, sizeof(*p));
4219         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4220         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4221         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4222 }
4223
4224 /*
4225  * return values:
4226  *   1 yes, we have a valid connection
4227  *   0 oops, did not work out, please try again
4228  *  -1 peer talks different language,
4229  *     no point in trying again, please go standalone.
4230  */
4231 static int drbd_do_features(struct drbd_tconn *tconn)
4232 {
4233         /* ASSERT current == tconn->receiver ... */
4234         struct p_connection_features *p;
4235         const int expect = sizeof(struct p_connection_features);
4236         struct packet_info pi;
4237         int err;
4238
4239         err = drbd_send_features(tconn);
4240         if (err)
4241                 return 0;
4242
4243         err = drbd_recv_header(tconn, &pi);
4244         if (err)
4245                 return 0;
4246
4247         if (pi.cmd != P_CONNECTION_FEATURES) {
4248                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4249                      cmdname(pi.cmd), pi.cmd);
4250                 return -1;
4251         }
4252
4253         if (pi.size != expect) {
4254                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4255                      expect, pi.size);
4256                 return -1;
4257         }
4258
4259         p = pi.data;
4260         err = drbd_recv_all_warn(tconn, p, expect);
4261         if (err)
4262                 return 0;
4263
4264         p->protocol_min = be32_to_cpu(p->protocol_min);
4265         p->protocol_max = be32_to_cpu(p->protocol_max);
4266         if (p->protocol_max == 0)
4267                 p->protocol_max = p->protocol_min;
4268
4269         if (PRO_VERSION_MAX < p->protocol_min ||
4270             PRO_VERSION_MIN > p->protocol_max)
4271                 goto incompat;
4272
4273         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4274
4275         conn_info(tconn, "Handshake successful: "
4276              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4277
4278         return 1;
4279
4280  incompat:
4281         conn_err(tconn, "incompatible DRBD dialects: "
4282             "I support %d-%d, peer supports %d-%d\n",
4283             PRO_VERSION_MIN, PRO_VERSION_MAX,
4284             p->protocol_min, p->protocol_max);
4285         return -1;
4286 }
4287
4288 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4289 static int drbd_do_auth(struct drbd_tconn *tconn)
4290 {
4291         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4292         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4293         return -1;
4294 }
4295 #else
4296 #define CHALLENGE_LEN 64
4297
4298 /* Return value:
4299         1 - auth succeeded,
4300         0 - failed, try again (network error),
4301         -1 - auth failed, don't try again.
4302 */
4303
4304 static int drbd_do_auth(struct drbd_tconn *tconn)
4305 {
4306         struct drbd_socket *sock;
4307         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4308         struct scatterlist sg;
4309         char *response = NULL;
4310         char *right_response = NULL;
4311         char *peers_ch = NULL;
4312         unsigned int key_len = strlen(tconn->net_conf->shared_secret);
4313         unsigned int resp_size;
4314         struct hash_desc desc;
4315         struct packet_info pi;
4316         int err, rv;
4317
4318         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4319
4320         desc.tfm = tconn->cram_hmac_tfm;
4321         desc.flags = 0;
4322
4323         rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4324                                 (u8 *)tconn->net_conf->shared_secret, key_len);
4325         if (rv) {
4326                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4327                 rv = -1;
4328                 goto fail;
4329         }
4330
4331         get_random_bytes(my_challenge, CHALLENGE_LEN);
4332
4333         sock = &tconn->data;
4334         if (!conn_prepare_command(tconn, sock)) {
4335                 rv = 0;
4336                 goto fail;
4337         }
4338         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4339                                 my_challenge, CHALLENGE_LEN);
4340         if (!rv)
4341                 goto fail;
4342
4343         err = drbd_recv_header(tconn, &pi);
4344         if (err) {
4345                 rv = 0;
4346                 goto fail;
4347         }
4348
4349         if (pi.cmd != P_AUTH_CHALLENGE) {
4350                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4351                     cmdname(pi.cmd), pi.cmd);
4352                 rv = 0;
4353                 goto fail;
4354         }
4355
4356         if (pi.size > CHALLENGE_LEN * 2) {
4357                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4358                 rv = -1;
4359                 goto fail;
4360         }
4361
4362         peers_ch = kmalloc(pi.size, GFP_NOIO);
4363         if (peers_ch == NULL) {
4364                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4365                 rv = -1;
4366                 goto fail;
4367         }
4368
4369         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4370         if (err) {
4371                 rv = 0;
4372                 goto fail;
4373         }
4374
4375         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4376         response = kmalloc(resp_size, GFP_NOIO);
4377         if (response == NULL) {
4378                 conn_err(tconn, "kmalloc of response failed\n");
4379                 rv = -1;
4380                 goto fail;
4381         }
4382
4383         sg_init_table(&sg, 1);
4384         sg_set_buf(&sg, peers_ch, pi.size);
4385
4386         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4387         if (rv) {
4388                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4389                 rv = -1;
4390                 goto fail;
4391         }
4392
4393         if (!conn_prepare_command(tconn, sock)) {
4394                 rv = 0;
4395                 goto fail;
4396         }
4397         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4398                                 response, resp_size);
4399         if (!rv)
4400                 goto fail;
4401
4402         err = drbd_recv_header(tconn, &pi);
4403         if (err) {
4404                 rv = 0;
4405                 goto fail;
4406         }
4407
4408         if (pi.cmd != P_AUTH_RESPONSE) {
4409                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4410                         cmdname(pi.cmd), pi.cmd);
4411                 rv = 0;
4412                 goto fail;
4413         }
4414
4415         if (pi.size != resp_size) {
4416                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4417                 rv = 0;
4418                 goto fail;
4419         }
4420
4421         err = drbd_recv_all_warn(tconn, response , resp_size);
4422         if (err) {
4423                 rv = 0;
4424                 goto fail;
4425         }
4426
4427         right_response = kmalloc(resp_size, GFP_NOIO);
4428         if (right_response == NULL) {
4429                 conn_err(tconn, "kmalloc of right_response failed\n");
4430                 rv = -1;
4431                 goto fail;
4432         }
4433
4434         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4435
4436         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4437         if (rv) {
4438                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4439                 rv = -1;
4440                 goto fail;
4441         }
4442
4443         rv = !memcmp(response, right_response, resp_size);
4444
4445         if (rv)
4446                 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4447                      resp_size, tconn->net_conf->cram_hmac_alg);
4448         else
4449                 rv = -1;
4450
4451  fail:
4452         kfree(peers_ch);
4453         kfree(response);
4454         kfree(right_response);
4455
4456         return rv;
4457 }
4458 #endif
4459
4460 int drbdd_init(struct drbd_thread *thi)
4461 {
4462         struct drbd_tconn *tconn = thi->tconn;
4463         int h;
4464
4465         conn_info(tconn, "receiver (re)started\n");
4466
4467         do {
4468                 h = drbd_connect(tconn);
4469                 if (h == 0) {
4470                         drbd_disconnect(tconn);
4471                         schedule_timeout_interruptible(HZ);
4472                 }
4473                 if (h == -1) {
4474                         conn_warn(tconn, "Discarding network configuration.\n");
4475                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4476                 }
4477         } while (h == 0);
4478
4479         if (h > 0) {
4480                 if (get_net_conf(tconn)) {
4481                         drbdd(tconn);
4482                         put_net_conf(tconn);
4483                 }
4484         }
4485
4486         drbd_disconnect(tconn);
4487
4488         conn_info(tconn, "receiver terminated\n");
4489         return 0;
4490 }
4491
4492 /* ********* acknowledge sender ******** */
4493
4494 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4495 {
4496         struct p_req_state_reply *p = pi->data;
4497         int retcode = be32_to_cpu(p->retcode);
4498
4499         if (retcode >= SS_SUCCESS) {
4500                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4501         } else {
4502                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4503                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4504                          drbd_set_st_err_str(retcode), retcode);
4505         }
4506         wake_up(&tconn->ping_wait);
4507
4508         return 0;
4509 }
4510
4511 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4512 {
4513         struct drbd_conf *mdev;
4514         struct p_req_state_reply *p = pi->data;
4515         int retcode = be32_to_cpu(p->retcode);
4516
4517         mdev = vnr_to_mdev(tconn, pi->vnr);
4518         if (!mdev)
4519                 return -EIO;
4520
4521         if (retcode >= SS_SUCCESS) {
4522                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4523         } else {
4524                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4525                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4526                         drbd_set_st_err_str(retcode), retcode);
4527         }
4528         wake_up(&mdev->state_wait);
4529
4530         return 0;
4531 }
4532
4533 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4534 {
4535         return drbd_send_ping_ack(tconn);
4536
4537 }
4538
4539 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4540 {
4541         /* restore idle timeout */
4542         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4543         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4544                 wake_up(&tconn->ping_wait);
4545
4546         return 0;
4547 }
4548
4549 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4550 {
4551         struct drbd_conf *mdev;
4552         struct p_block_ack *p = pi->data;
4553         sector_t sector = be64_to_cpu(p->sector);
4554         int blksize = be32_to_cpu(p->blksize);
4555
4556         mdev = vnr_to_mdev(tconn, pi->vnr);
4557         if (!mdev)
4558                 return -EIO;
4559
4560         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4561
4562         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4563
4564         if (get_ldev(mdev)) {
4565                 drbd_rs_complete_io(mdev, sector);
4566                 drbd_set_in_sync(mdev, sector, blksize);
4567                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4568                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4569                 put_ldev(mdev);
4570         }
4571         dec_rs_pending(mdev);
4572         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4573
4574         return 0;
4575 }
4576
4577 static int
4578 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4579                               struct rb_root *root, const char *func,
4580                               enum drbd_req_event what, bool missing_ok)
4581 {
4582         struct drbd_request *req;
4583         struct bio_and_error m;
4584
4585         spin_lock_irq(&mdev->tconn->req_lock);
4586         req = find_request(mdev, root, id, sector, missing_ok, func);
4587         if (unlikely(!req)) {
4588                 spin_unlock_irq(&mdev->tconn->req_lock);
4589                 return -EIO;
4590         }
4591         __req_mod(req, what, &m);
4592         spin_unlock_irq(&mdev->tconn->req_lock);
4593
4594         if (m.bio)
4595                 complete_master_bio(mdev, &m);
4596         return 0;
4597 }
4598
4599 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4600 {
4601         struct drbd_conf *mdev;
4602         struct p_block_ack *p = pi->data;
4603         sector_t sector = be64_to_cpu(p->sector);
4604         int blksize = be32_to_cpu(p->blksize);
4605         enum drbd_req_event what;
4606
4607         mdev = vnr_to_mdev(tconn, pi->vnr);
4608         if (!mdev)
4609                 return -EIO;
4610
4611         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4612
4613         if (p->block_id == ID_SYNCER) {
4614                 drbd_set_in_sync(mdev, sector, blksize);
4615                 dec_rs_pending(mdev);
4616                 return 0;
4617         }
4618         switch (pi->cmd) {
4619         case P_RS_WRITE_ACK:
4620                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4621                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4622                 break;
4623         case P_WRITE_ACK:
4624                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4625                 what = WRITE_ACKED_BY_PEER;
4626                 break;
4627         case P_RECV_ACK:
4628                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
4629                 what = RECV_ACKED_BY_PEER;
4630                 break;
4631         case P_DISCARD_WRITE:
4632                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4633                 what = DISCARD_WRITE;
4634                 break;
4635         case P_RETRY_WRITE:
4636                 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4637                 what = POSTPONE_WRITE;
4638                 break;
4639         default:
4640                 BUG();
4641         }
4642
4643         return validate_req_change_req_state(mdev, p->block_id, sector,
4644                                              &mdev->write_requests, __func__,
4645                                              what, false);
4646 }
4647
4648 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4649 {
4650         struct drbd_conf *mdev;
4651         struct p_block_ack *p = pi->data;
4652         sector_t sector = be64_to_cpu(p->sector);
4653         int size = be32_to_cpu(p->blksize);
4654         bool missing_ok = tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4655                           tconn->net_conf->wire_protocol == DRBD_PROT_B;
4656         int err;
4657
4658         mdev = vnr_to_mdev(tconn, pi->vnr);
4659         if (!mdev)
4660                 return -EIO;
4661
4662         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4663
4664         if (p->block_id == ID_SYNCER) {
4665                 dec_rs_pending(mdev);
4666                 drbd_rs_failed_io(mdev, sector, size);
4667                 return 0;
4668         }
4669
4670         err = validate_req_change_req_state(mdev, p->block_id, sector,
4671                                             &mdev->write_requests, __func__,
4672                                             NEG_ACKED, missing_ok);
4673         if (err) {
4674                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4675                    The master bio might already be completed, therefore the
4676                    request is no longer in the collision hash. */
4677                 /* In Protocol B we might already have got a P_RECV_ACK
4678                    but then get a P_NEG_ACK afterwards. */
4679                 if (!missing_ok)
4680                         return err;
4681                 drbd_set_out_of_sync(mdev, sector, size);
4682         }
4683         return 0;
4684 }
4685
4686 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4687 {
4688         struct drbd_conf *mdev;
4689         struct p_block_ack *p = pi->data;
4690         sector_t sector = be64_to_cpu(p->sector);
4691
4692         mdev = vnr_to_mdev(tconn, pi->vnr);
4693         if (!mdev)
4694                 return -EIO;
4695
4696         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4697
4698         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4699             (unsigned long long)sector, be32_to_cpu(p->blksize));
4700
4701         return validate_req_change_req_state(mdev, p->block_id, sector,
4702                                              &mdev->read_requests, __func__,
4703                                              NEG_ACKED, false);
4704 }
4705
4706 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4707 {
4708         struct drbd_conf *mdev;
4709         sector_t sector;
4710         int size;
4711         struct p_block_ack *p = pi->data;
4712
4713         mdev = vnr_to_mdev(tconn, pi->vnr);
4714         if (!mdev)
4715                 return -EIO;
4716
4717         sector = be64_to_cpu(p->sector);
4718         size = be32_to_cpu(p->blksize);
4719
4720         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4721
4722         dec_rs_pending(mdev);
4723
4724         if (get_ldev_if_state(mdev, D_FAILED)) {
4725                 drbd_rs_complete_io(mdev, sector);
4726                 switch (pi->cmd) {
4727                 case P_NEG_RS_DREPLY:
4728                         drbd_rs_failed_io(mdev, sector, size);
4729                 case P_RS_CANCEL:
4730                         break;
4731                 default:
4732                         BUG();
4733                 }
4734                 put_ldev(mdev);
4735         }
4736
4737         return 0;
4738 }
4739
4740 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4741 {
4742         struct drbd_conf *mdev;
4743         struct p_barrier_ack *p = pi->data;
4744
4745         mdev = vnr_to_mdev(tconn, pi->vnr);
4746         if (!mdev)
4747                 return -EIO;
4748
4749         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4750
4751         if (mdev->state.conn == C_AHEAD &&
4752             atomic_read(&mdev->ap_in_flight) == 0 &&
4753             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4754                 mdev->start_resync_timer.expires = jiffies + HZ;
4755                 add_timer(&mdev->start_resync_timer);
4756         }
4757
4758         return 0;
4759 }
4760
4761 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4762 {
4763         struct drbd_conf *mdev;
4764         struct p_block_ack *p = pi->data;
4765         struct drbd_work *w;
4766         sector_t sector;
4767         int size;
4768
4769         mdev = vnr_to_mdev(tconn, pi->vnr);
4770         if (!mdev)
4771                 return -EIO;
4772
4773         sector = be64_to_cpu(p->sector);
4774         size = be32_to_cpu(p->blksize);
4775
4776         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4777
4778         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4779                 drbd_ov_out_of_sync_found(mdev, sector, size);
4780         else
4781                 ov_out_of_sync_print(mdev);
4782
4783         if (!get_ldev(mdev))
4784                 return 0;
4785
4786         drbd_rs_complete_io(mdev, sector);
4787         dec_rs_pending(mdev);
4788
4789         --mdev->ov_left;
4790
4791         /* let's advance progress step marks only for every other megabyte */
4792         if ((mdev->ov_left & 0x200) == 0x200)
4793                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4794
4795         if (mdev->ov_left == 0) {
4796                 w = kmalloc(sizeof(*w), GFP_NOIO);
4797                 if (w) {
4798                         w->cb = w_ov_finished;
4799                         w->mdev = mdev;
4800                         drbd_queue_work_front(&mdev->tconn->data.work, w);
4801                 } else {
4802                         dev_err(DEV, "kmalloc(w) failed.");
4803                         ov_out_of_sync_print(mdev);
4804                         drbd_resync_finished(mdev);
4805                 }
4806         }
4807         put_ldev(mdev);
4808         return 0;
4809 }
4810
4811 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4812 {
4813         return 0;
4814 }
4815
4816 static int tconn_process_done_ee(struct drbd_tconn *tconn)
4817 {
4818         struct drbd_conf *mdev;
4819         int i, not_empty = 0;
4820
4821         do {
4822                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4823                 flush_signals(current);
4824                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4825                         if (drbd_process_done_ee(mdev))
4826                                 return 1; /* error */
4827                 }
4828                 set_bit(SIGNAL_ASENDER, &tconn->flags);
4829
4830                 spin_lock_irq(&tconn->req_lock);
4831                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4832                         not_empty = !list_empty(&mdev->done_ee);
4833                         if (not_empty)
4834                                 break;
4835                 }
4836                 spin_unlock_irq(&tconn->req_lock);
4837         } while (not_empty);
4838
4839         return 0;
4840 }
4841
4842 struct asender_cmd {
4843         size_t pkt_size;
4844         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
4845 };
4846
4847 static struct asender_cmd asender_tbl[] = {
4848         [P_PING]            = { 0, got_Ping },
4849         [P_PING_ACK]        = { 0, got_PingAck },
4850         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4851         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4852         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4853         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
4854         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4855         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4856         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
4857         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4858         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4859         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4860         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4861         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4862         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
4863         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4864         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
4865 };
4866
4867 int drbd_asender(struct drbd_thread *thi)
4868 {
4869         struct drbd_tconn *tconn = thi->tconn;
4870         struct asender_cmd *cmd = NULL;
4871         struct packet_info pi;
4872         int rv;
4873         void *buf    = tconn->meta.rbuf;
4874         int received = 0;
4875         unsigned int header_size = drbd_header_size(tconn);
4876         int expect   = header_size;
4877         int ping_timeout_active = 0;
4878
4879         current->policy = SCHED_RR;  /* Make this a realtime task! */
4880         current->rt_priority = 2;    /* more important than all other tasks */
4881
4882         while (get_t_state(thi) == RUNNING) {
4883                 drbd_thread_current_set_cpu(thi);
4884                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
4885                         if (drbd_send_ping(tconn)) {
4886                                 conn_err(tconn, "drbd_send_ping has failed\n");
4887                                 goto reconnect;
4888                         }
4889                         tconn->meta.socket->sk->sk_rcvtimeo =
4890                                 tconn->net_conf->ping_timeo*HZ/10;
4891                         ping_timeout_active = 1;
4892                 }
4893
4894                 /* TODO: conditionally cork; it may hurt latency if we cork without
4895                    much to send */
4896                 if (!tconn->net_conf->no_cork)
4897                         drbd_tcp_cork(tconn->meta.socket);
4898                 if (tconn_process_done_ee(tconn)) {
4899                         conn_err(tconn, "tconn_process_done_ee() failed\n");
4900                         goto reconnect;
4901                 }
4902                 /* but unconditionally uncork unless disabled */
4903                 if (!tconn->net_conf->no_cork)
4904                         drbd_tcp_uncork(tconn->meta.socket);
4905
4906                 /* short circuit, recv_msg would return EINTR anyways. */
4907                 if (signal_pending(current))
4908                         continue;
4909
4910                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4911                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4912
4913                 flush_signals(current);
4914
4915                 /* Note:
4916                  * -EINTR        (on meta) we got a signal
4917                  * -EAGAIN       (on meta) rcvtimeo expired
4918                  * -ECONNRESET   other side closed the connection
4919                  * -ERESTARTSYS  (on data) we got a signal
4920                  * rv <  0       other than above: unexpected error!
4921                  * rv == expected: full header or command
4922                  * rv <  expected: "woken" by signal during receive
4923                  * rv == 0       : "connection shut down by peer"
4924                  */
4925                 if (likely(rv > 0)) {
4926                         received += rv;
4927                         buf      += rv;
4928                 } else if (rv == 0) {
4929                         conn_err(tconn, "meta connection shut down by peer.\n");
4930                         goto reconnect;
4931                 } else if (rv == -EAGAIN) {
4932                         /* If the data socket received something meanwhile,
4933                          * that is good enough: peer is still alive. */
4934                         if (time_after(tconn->last_received,
4935                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
4936                                 continue;
4937                         if (ping_timeout_active) {
4938                                 conn_err(tconn, "PingAck did not arrive in time.\n");
4939                                 goto reconnect;
4940                         }
4941                         set_bit(SEND_PING, &tconn->flags);
4942                         continue;
4943                 } else if (rv == -EINTR) {
4944                         continue;
4945                 } else {
4946                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
4947                         goto reconnect;
4948                 }
4949
4950                 if (received == expect && cmd == NULL) {
4951                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
4952                                 goto reconnect;
4953                         cmd = &asender_tbl[pi.cmd];
4954                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
4955                                 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
4956                                         pi.cmd, pi.size);
4957                                 goto disconnect;
4958                         }
4959                         expect = header_size + cmd->pkt_size;
4960                         if (pi.size != expect - header_size) {
4961                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
4962                                         pi.cmd, pi.size);
4963                                 goto reconnect;
4964                         }
4965                 }
4966                 if (received == expect) {
4967                         bool err;
4968
4969                         err = cmd->fn(tconn, &pi);
4970                         if (err) {
4971                                 conn_err(tconn, "%pf failed\n", cmd->fn);
4972                                 goto reconnect;
4973                         }
4974
4975                         tconn->last_received = jiffies;
4976
4977                         /* the idle_timeout (ping-int)
4978                          * has been restored in got_PingAck() */
4979                         if (cmd == &asender_tbl[P_PING_ACK])
4980                                 ping_timeout_active = 0;
4981
4982                         buf      = tconn->meta.rbuf;
4983                         received = 0;
4984                         expect   = header_size;
4985                         cmd      = NULL;
4986                 }
4987         }
4988
4989         if (0) {
4990 reconnect:
4991                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4992         }
4993         if (0) {
4994 disconnect:
4995                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4996         }
4997         clear_bit(SIGNAL_ASENDER, &tconn->flags);
4998
4999         conn_info(tconn, "asender terminated\n");
5000
5001         return 0;
5002 }