]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
Merge branch 'work.memdup_user' of git://git.kernel.org/pub/scm/linux/kernel/git...
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <uapi/linux/sched/types.h>
40 #include <linux/sched/signal.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_protocol.h"
50 #include "drbd_req.h"
51 #include "drbd_vli.h"
52
53 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME)
54
55 struct packet_info {
56         enum drbd_packet cmd;
57         unsigned int size;
58         unsigned int vnr;
59         void *data;
60 };
61
62 enum finish_epoch {
63         FE_STILL_LIVE,
64         FE_DESTROYED,
65         FE_RECYCLED,
66 };
67
68 static int drbd_do_features(struct drbd_connection *connection);
69 static int drbd_do_auth(struct drbd_connection *connection);
70 static int drbd_disconnected(struct drbd_peer_device *);
71 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
72 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
73 static int e_end_block(struct drbd_work *, int);
74
75
76 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77
78 /*
79  * some helper functions to deal with single linked page lists,
80  * page->private being our "next" pointer.
81  */
82
83 /* If at least n pages are linked at head, get n pages off.
84  * Otherwise, don't modify head, and return NULL.
85  * Locking is the responsibility of the caller.
86  */
87 static struct page *page_chain_del(struct page **head, int n)
88 {
89         struct page *page;
90         struct page *tmp;
91
92         BUG_ON(!n);
93         BUG_ON(!head);
94
95         page = *head;
96
97         if (!page)
98                 return NULL;
99
100         while (page) {
101                 tmp = page_chain_next(page);
102                 if (--n == 0)
103                         break; /* found sufficient pages */
104                 if (tmp == NULL)
105                         /* insufficient pages, don't use any of them. */
106                         return NULL;
107                 page = tmp;
108         }
109
110         /* add end of list marker for the returned list */
111         set_page_private(page, 0);
112         /* actual return value, and adjustment of head */
113         page = *head;
114         *head = tmp;
115         return page;
116 }
117
118 /* may be used outside of locks to find the tail of a (usually short)
119  * "private" page chain, before adding it back to a global chain head
120  * with page_chain_add() under a spinlock. */
121 static struct page *page_chain_tail(struct page *page, int *len)
122 {
123         struct page *tmp;
124         int i = 1;
125         while ((tmp = page_chain_next(page)))
126                 ++i, page = tmp;
127         if (len)
128                 *len = i;
129         return page;
130 }
131
132 static int page_chain_free(struct page *page)
133 {
134         struct page *tmp;
135         int i = 0;
136         page_chain_for_each_safe(page, tmp) {
137                 put_page(page);
138                 ++i;
139         }
140         return i;
141 }
142
143 static void page_chain_add(struct page **head,
144                 struct page *chain_first, struct page *chain_last)
145 {
146 #if 1
147         struct page *tmp;
148         tmp = page_chain_tail(chain_first, NULL);
149         BUG_ON(tmp != chain_last);
150 #endif
151
152         /* add chain to head */
153         set_page_private(chain_last, (unsigned long)*head);
154         *head = chain_first;
155 }
156
157 static struct page *__drbd_alloc_pages(struct drbd_device *device,
158                                        unsigned int number)
159 {
160         struct page *page = NULL;
161         struct page *tmp = NULL;
162         unsigned int i = 0;
163
164         /* Yes, testing drbd_pp_vacant outside the lock is racy.
165          * So what. It saves a spin_lock. */
166         if (drbd_pp_vacant >= number) {
167                 spin_lock(&drbd_pp_lock);
168                 page = page_chain_del(&drbd_pp_pool, number);
169                 if (page)
170                         drbd_pp_vacant -= number;
171                 spin_unlock(&drbd_pp_lock);
172                 if (page)
173                         return page;
174         }
175
176         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177          * "criss-cross" setup, that might cause write-out on some other DRBD,
178          * which in turn might block on the other node at this very place.  */
179         for (i = 0; i < number; i++) {
180                 tmp = alloc_page(GFP_TRY);
181                 if (!tmp)
182                         break;
183                 set_page_private(tmp, (unsigned long)page);
184                 page = tmp;
185         }
186
187         if (i == number)
188                 return page;
189
190         /* Not enough pages immediately available this time.
191          * No need to jump around here, drbd_alloc_pages will retry this
192          * function "soon". */
193         if (page) {
194                 tmp = page_chain_tail(page, NULL);
195                 spin_lock(&drbd_pp_lock);
196                 page_chain_add(&drbd_pp_pool, page, tmp);
197                 drbd_pp_vacant += i;
198                 spin_unlock(&drbd_pp_lock);
199         }
200         return NULL;
201 }
202
203 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
204                                            struct list_head *to_be_freed)
205 {
206         struct drbd_peer_request *peer_req, *tmp;
207
208         /* The EEs are always appended to the end of the list. Since
209            they are sent in order over the wire, they have to finish
210            in order. As soon as we see the first not finished we can
211            stop to examine the list... */
212
213         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
214                 if (drbd_peer_req_has_active_page(peer_req))
215                         break;
216                 list_move(&peer_req->w.list, to_be_freed);
217         }
218 }
219
220 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
221 {
222         LIST_HEAD(reclaimed);
223         struct drbd_peer_request *peer_req, *t;
224
225         spin_lock_irq(&device->resource->req_lock);
226         reclaim_finished_net_peer_reqs(device, &reclaimed);
227         spin_unlock_irq(&device->resource->req_lock);
228         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
229                 drbd_free_net_peer_req(device, peer_req);
230 }
231
232 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
233 {
234         struct drbd_peer_device *peer_device;
235         int vnr;
236
237         rcu_read_lock();
238         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
239                 struct drbd_device *device = peer_device->device;
240                 if (!atomic_read(&device->pp_in_use_by_net))
241                         continue;
242
243                 kref_get(&device->kref);
244                 rcu_read_unlock();
245                 drbd_reclaim_net_peer_reqs(device);
246                 kref_put(&device->kref, drbd_destroy_device);
247                 rcu_read_lock();
248         }
249         rcu_read_unlock();
250 }
251
252 /**
253  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
254  * @device:     DRBD device.
255  * @number:     number of pages requested
256  * @retry:      whether to retry, if not enough pages are available right now
257  *
258  * Tries to allocate number pages, first from our own page pool, then from
259  * the kernel.
260  * Possibly retry until DRBD frees sufficient pages somewhere else.
261  *
262  * If this allocation would exceed the max_buffers setting, we throttle
263  * allocation (schedule_timeout) to give the system some room to breathe.
264  *
265  * We do not use max-buffers as hard limit, because it could lead to
266  * congestion and further to a distributed deadlock during online-verify or
267  * (checksum based) resync, if the max-buffers, socket buffer sizes and
268  * resync-rate settings are mis-configured.
269  *
270  * Returns a page chain linked via page->private.
271  */
272 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
273                               bool retry)
274 {
275         struct drbd_device *device = peer_device->device;
276         struct page *page = NULL;
277         struct net_conf *nc;
278         DEFINE_WAIT(wait);
279         unsigned int mxb;
280
281         rcu_read_lock();
282         nc = rcu_dereference(peer_device->connection->net_conf);
283         mxb = nc ? nc->max_buffers : 1000000;
284         rcu_read_unlock();
285
286         if (atomic_read(&device->pp_in_use) < mxb)
287                 page = __drbd_alloc_pages(device, number);
288
289         /* Try to keep the fast path fast, but occasionally we need
290          * to reclaim the pages we lended to the network stack. */
291         if (page && atomic_read(&device->pp_in_use_by_net) > 512)
292                 drbd_reclaim_net_peer_reqs(device);
293
294         while (page == NULL) {
295                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
296
297                 drbd_reclaim_net_peer_reqs(device);
298
299                 if (atomic_read(&device->pp_in_use) < mxb) {
300                         page = __drbd_alloc_pages(device, number);
301                         if (page)
302                                 break;
303                 }
304
305                 if (!retry)
306                         break;
307
308                 if (signal_pending(current)) {
309                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
310                         break;
311                 }
312
313                 if (schedule_timeout(HZ/10) == 0)
314                         mxb = UINT_MAX;
315         }
316         finish_wait(&drbd_pp_wait, &wait);
317
318         if (page)
319                 atomic_add(number, &device->pp_in_use);
320         return page;
321 }
322
323 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
324  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
325  * Either links the page chain back to the global pool,
326  * or returns all pages to the system. */
327 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
328 {
329         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
330         int i;
331
332         if (page == NULL)
333                 return;
334
335         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
336                 i = page_chain_free(page);
337         else {
338                 struct page *tmp;
339                 tmp = page_chain_tail(page, &i);
340                 spin_lock(&drbd_pp_lock);
341                 page_chain_add(&drbd_pp_pool, page, tmp);
342                 drbd_pp_vacant += i;
343                 spin_unlock(&drbd_pp_lock);
344         }
345         i = atomic_sub_return(i, a);
346         if (i < 0)
347                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
348                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
349         wake_up(&drbd_pp_wait);
350 }
351
352 /*
353 You need to hold the req_lock:
354  _drbd_wait_ee_list_empty()
355
356 You must not have the req_lock:
357  drbd_free_peer_req()
358  drbd_alloc_peer_req()
359  drbd_free_peer_reqs()
360  drbd_ee_fix_bhs()
361  drbd_finish_peer_reqs()
362  drbd_clear_done_ee()
363  drbd_wait_ee_list_empty()
364 */
365
366 /* normal: payload_size == request size (bi_size)
367  * w_same: payload_size == logical_block_size
368  * trim: payload_size == 0 */
369 struct drbd_peer_request *
370 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
371                     unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
372 {
373         struct drbd_device *device = peer_device->device;
374         struct drbd_peer_request *peer_req;
375         struct page *page = NULL;
376         unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
377
378         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
379                 return NULL;
380
381         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
382         if (!peer_req) {
383                 if (!(gfp_mask & __GFP_NOWARN))
384                         drbd_err(device, "%s: allocation failed\n", __func__);
385                 return NULL;
386         }
387
388         if (nr_pages) {
389                 page = drbd_alloc_pages(peer_device, nr_pages,
390                                         gfpflags_allow_blocking(gfp_mask));
391                 if (!page)
392                         goto fail;
393         }
394
395         memset(peer_req, 0, sizeof(*peer_req));
396         INIT_LIST_HEAD(&peer_req->w.list);
397         drbd_clear_interval(&peer_req->i);
398         peer_req->i.size = request_size;
399         peer_req->i.sector = sector;
400         peer_req->submit_jif = jiffies;
401         peer_req->peer_device = peer_device;
402         peer_req->pages = page;
403         /*
404          * The block_id is opaque to the receiver.  It is not endianness
405          * converted, and sent back to the sender unchanged.
406          */
407         peer_req->block_id = id;
408
409         return peer_req;
410
411  fail:
412         mempool_free(peer_req, drbd_ee_mempool);
413         return NULL;
414 }
415
416 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
417                        int is_net)
418 {
419         might_sleep();
420         if (peer_req->flags & EE_HAS_DIGEST)
421                 kfree(peer_req->digest);
422         drbd_free_pages(device, peer_req->pages, is_net);
423         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
424         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
425         if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
426                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
427                 drbd_al_complete_io(device, &peer_req->i);
428         }
429         mempool_free(peer_req, drbd_ee_mempool);
430 }
431
432 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
433 {
434         LIST_HEAD(work_list);
435         struct drbd_peer_request *peer_req, *t;
436         int count = 0;
437         int is_net = list == &device->net_ee;
438
439         spin_lock_irq(&device->resource->req_lock);
440         list_splice_init(list, &work_list);
441         spin_unlock_irq(&device->resource->req_lock);
442
443         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
444                 __drbd_free_peer_req(device, peer_req, is_net);
445                 count++;
446         }
447         return count;
448 }
449
450 /*
451  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
452  */
453 static int drbd_finish_peer_reqs(struct drbd_device *device)
454 {
455         LIST_HEAD(work_list);
456         LIST_HEAD(reclaimed);
457         struct drbd_peer_request *peer_req, *t;
458         int err = 0;
459
460         spin_lock_irq(&device->resource->req_lock);
461         reclaim_finished_net_peer_reqs(device, &reclaimed);
462         list_splice_init(&device->done_ee, &work_list);
463         spin_unlock_irq(&device->resource->req_lock);
464
465         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
466                 drbd_free_net_peer_req(device, peer_req);
467
468         /* possible callbacks here:
469          * e_end_block, and e_end_resync_block, e_send_superseded.
470          * all ignore the last argument.
471          */
472         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
473                 int err2;
474
475                 /* list_del not necessary, next/prev members not touched */
476                 err2 = peer_req->w.cb(&peer_req->w, !!err);
477                 if (!err)
478                         err = err2;
479                 drbd_free_peer_req(device, peer_req);
480         }
481         wake_up(&device->ee_wait);
482
483         return err;
484 }
485
486 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
487                                      struct list_head *head)
488 {
489         DEFINE_WAIT(wait);
490
491         /* avoids spin_lock/unlock
492          * and calling prepare_to_wait in the fast path */
493         while (!list_empty(head)) {
494                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
495                 spin_unlock_irq(&device->resource->req_lock);
496                 io_schedule();
497                 finish_wait(&device->ee_wait, &wait);
498                 spin_lock_irq(&device->resource->req_lock);
499         }
500 }
501
502 static void drbd_wait_ee_list_empty(struct drbd_device *device,
503                                     struct list_head *head)
504 {
505         spin_lock_irq(&device->resource->req_lock);
506         _drbd_wait_ee_list_empty(device, head);
507         spin_unlock_irq(&device->resource->req_lock);
508 }
509
510 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
511 {
512         struct kvec iov = {
513                 .iov_base = buf,
514                 .iov_len = size,
515         };
516         struct msghdr msg = {
517                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
518         };
519         return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
520 }
521
522 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
523 {
524         int rv;
525
526         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
527
528         if (rv < 0) {
529                 if (rv == -ECONNRESET)
530                         drbd_info(connection, "sock was reset by peer\n");
531                 else if (rv != -ERESTARTSYS)
532                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
533         } else if (rv == 0) {
534                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
535                         long t;
536                         rcu_read_lock();
537                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
538                         rcu_read_unlock();
539
540                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
541
542                         if (t)
543                                 goto out;
544                 }
545                 drbd_info(connection, "sock was shut down by peer\n");
546         }
547
548         if (rv != size)
549                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
550
551 out:
552         return rv;
553 }
554
555 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
556 {
557         int err;
558
559         err = drbd_recv(connection, buf, size);
560         if (err != size) {
561                 if (err >= 0)
562                         err = -EIO;
563         } else
564                 err = 0;
565         return err;
566 }
567
568 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
569 {
570         int err;
571
572         err = drbd_recv_all(connection, buf, size);
573         if (err && !signal_pending(current))
574                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
575         return err;
576 }
577
578 /* quoting tcp(7):
579  *   On individual connections, the socket buffer size must be set prior to the
580  *   listen(2) or connect(2) calls in order to have it take effect.
581  * This is our wrapper to do so.
582  */
583 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
584                 unsigned int rcv)
585 {
586         /* open coded SO_SNDBUF, SO_RCVBUF */
587         if (snd) {
588                 sock->sk->sk_sndbuf = snd;
589                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
590         }
591         if (rcv) {
592                 sock->sk->sk_rcvbuf = rcv;
593                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
594         }
595 }
596
597 static struct socket *drbd_try_connect(struct drbd_connection *connection)
598 {
599         const char *what;
600         struct socket *sock;
601         struct sockaddr_in6 src_in6;
602         struct sockaddr_in6 peer_in6;
603         struct net_conf *nc;
604         int err, peer_addr_len, my_addr_len;
605         int sndbuf_size, rcvbuf_size, connect_int;
606         int disconnect_on_error = 1;
607
608         rcu_read_lock();
609         nc = rcu_dereference(connection->net_conf);
610         if (!nc) {
611                 rcu_read_unlock();
612                 return NULL;
613         }
614         sndbuf_size = nc->sndbuf_size;
615         rcvbuf_size = nc->rcvbuf_size;
616         connect_int = nc->connect_int;
617         rcu_read_unlock();
618
619         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
620         memcpy(&src_in6, &connection->my_addr, my_addr_len);
621
622         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
623                 src_in6.sin6_port = 0;
624         else
625                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
626
627         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
628         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
629
630         what = "sock_create_kern";
631         err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
632                                SOCK_STREAM, IPPROTO_TCP, &sock);
633         if (err < 0) {
634                 sock = NULL;
635                 goto out;
636         }
637
638         sock->sk->sk_rcvtimeo =
639         sock->sk->sk_sndtimeo = connect_int * HZ;
640         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
641
642        /* explicitly bind to the configured IP as source IP
643         *  for the outgoing connections.
644         *  This is needed for multihomed hosts and to be
645         *  able to use lo: interfaces for drbd.
646         * Make sure to use 0 as port number, so linux selects
647         *  a free one dynamically.
648         */
649         what = "bind before connect";
650         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
651         if (err < 0)
652                 goto out;
653
654         /* connect may fail, peer not yet available.
655          * stay C_WF_CONNECTION, don't go Disconnecting! */
656         disconnect_on_error = 0;
657         what = "connect";
658         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
659
660 out:
661         if (err < 0) {
662                 if (sock) {
663                         sock_release(sock);
664                         sock = NULL;
665                 }
666                 switch (-err) {
667                         /* timeout, busy, signal pending */
668                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
669                 case EINTR: case ERESTARTSYS:
670                         /* peer not (yet) available, network problem */
671                 case ECONNREFUSED: case ENETUNREACH:
672                 case EHOSTDOWN:    case EHOSTUNREACH:
673                         disconnect_on_error = 0;
674                         break;
675                 default:
676                         drbd_err(connection, "%s failed, err = %d\n", what, err);
677                 }
678                 if (disconnect_on_error)
679                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
680         }
681
682         return sock;
683 }
684
685 struct accept_wait_data {
686         struct drbd_connection *connection;
687         struct socket *s_listen;
688         struct completion door_bell;
689         void (*original_sk_state_change)(struct sock *sk);
690
691 };
692
693 static void drbd_incoming_connection(struct sock *sk)
694 {
695         struct accept_wait_data *ad = sk->sk_user_data;
696         void (*state_change)(struct sock *sk);
697
698         state_change = ad->original_sk_state_change;
699         if (sk->sk_state == TCP_ESTABLISHED)
700                 complete(&ad->door_bell);
701         state_change(sk);
702 }
703
704 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
705 {
706         int err, sndbuf_size, rcvbuf_size, my_addr_len;
707         struct sockaddr_in6 my_addr;
708         struct socket *s_listen;
709         struct net_conf *nc;
710         const char *what;
711
712         rcu_read_lock();
713         nc = rcu_dereference(connection->net_conf);
714         if (!nc) {
715                 rcu_read_unlock();
716                 return -EIO;
717         }
718         sndbuf_size = nc->sndbuf_size;
719         rcvbuf_size = nc->rcvbuf_size;
720         rcu_read_unlock();
721
722         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
723         memcpy(&my_addr, &connection->my_addr, my_addr_len);
724
725         what = "sock_create_kern";
726         err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
727                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
728         if (err) {
729                 s_listen = NULL;
730                 goto out;
731         }
732
733         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
734         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
735
736         what = "bind before listen";
737         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
738         if (err < 0)
739                 goto out;
740
741         ad->s_listen = s_listen;
742         write_lock_bh(&s_listen->sk->sk_callback_lock);
743         ad->original_sk_state_change = s_listen->sk->sk_state_change;
744         s_listen->sk->sk_state_change = drbd_incoming_connection;
745         s_listen->sk->sk_user_data = ad;
746         write_unlock_bh(&s_listen->sk->sk_callback_lock);
747
748         what = "listen";
749         err = s_listen->ops->listen(s_listen, 5);
750         if (err < 0)
751                 goto out;
752
753         return 0;
754 out:
755         if (s_listen)
756                 sock_release(s_listen);
757         if (err < 0) {
758                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
759                         drbd_err(connection, "%s failed, err = %d\n", what, err);
760                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
761                 }
762         }
763
764         return -EIO;
765 }
766
767 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
768 {
769         write_lock_bh(&sk->sk_callback_lock);
770         sk->sk_state_change = ad->original_sk_state_change;
771         sk->sk_user_data = NULL;
772         write_unlock_bh(&sk->sk_callback_lock);
773 }
774
775 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
776 {
777         int timeo, connect_int, err = 0;
778         struct socket *s_estab = NULL;
779         struct net_conf *nc;
780
781         rcu_read_lock();
782         nc = rcu_dereference(connection->net_conf);
783         if (!nc) {
784                 rcu_read_unlock();
785                 return NULL;
786         }
787         connect_int = nc->connect_int;
788         rcu_read_unlock();
789
790         timeo = connect_int * HZ;
791         /* 28.5% random jitter */
792         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
793
794         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
795         if (err <= 0)
796                 return NULL;
797
798         err = kernel_accept(ad->s_listen, &s_estab, 0);
799         if (err < 0) {
800                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
801                         drbd_err(connection, "accept failed, err = %d\n", err);
802                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
803                 }
804         }
805
806         if (s_estab)
807                 unregister_state_change(s_estab->sk, ad);
808
809         return s_estab;
810 }
811
812 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
813
814 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
815                              enum drbd_packet cmd)
816 {
817         if (!conn_prepare_command(connection, sock))
818                 return -EIO;
819         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
820 }
821
822 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
823 {
824         unsigned int header_size = drbd_header_size(connection);
825         struct packet_info pi;
826         struct net_conf *nc;
827         int err;
828
829         rcu_read_lock();
830         nc = rcu_dereference(connection->net_conf);
831         if (!nc) {
832                 rcu_read_unlock();
833                 return -EIO;
834         }
835         sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
836         rcu_read_unlock();
837
838         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
839         if (err != header_size) {
840                 if (err >= 0)
841                         err = -EIO;
842                 return err;
843         }
844         err = decode_header(connection, connection->data.rbuf, &pi);
845         if (err)
846                 return err;
847         return pi.cmd;
848 }
849
850 /**
851  * drbd_socket_okay() - Free the socket if its connection is not okay
852  * @sock:       pointer to the pointer to the socket.
853  */
854 static bool drbd_socket_okay(struct socket **sock)
855 {
856         int rr;
857         char tb[4];
858
859         if (!*sock)
860                 return false;
861
862         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
863
864         if (rr > 0 || rr == -EAGAIN) {
865                 return true;
866         } else {
867                 sock_release(*sock);
868                 *sock = NULL;
869                 return false;
870         }
871 }
872
873 static bool connection_established(struct drbd_connection *connection,
874                                    struct socket **sock1,
875                                    struct socket **sock2)
876 {
877         struct net_conf *nc;
878         int timeout;
879         bool ok;
880
881         if (!*sock1 || !*sock2)
882                 return false;
883
884         rcu_read_lock();
885         nc = rcu_dereference(connection->net_conf);
886         timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
887         rcu_read_unlock();
888         schedule_timeout_interruptible(timeout);
889
890         ok = drbd_socket_okay(sock1);
891         ok = drbd_socket_okay(sock2) && ok;
892
893         return ok;
894 }
895
896 /* Gets called if a connection is established, or if a new minor gets created
897    in a connection */
898 int drbd_connected(struct drbd_peer_device *peer_device)
899 {
900         struct drbd_device *device = peer_device->device;
901         int err;
902
903         atomic_set(&device->packet_seq, 0);
904         device->peer_seq = 0;
905
906         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
907                 &peer_device->connection->cstate_mutex :
908                 &device->own_state_mutex;
909
910         err = drbd_send_sync_param(peer_device);
911         if (!err)
912                 err = drbd_send_sizes(peer_device, 0, 0);
913         if (!err)
914                 err = drbd_send_uuids(peer_device);
915         if (!err)
916                 err = drbd_send_current_state(peer_device);
917         clear_bit(USE_DEGR_WFC_T, &device->flags);
918         clear_bit(RESIZE_PENDING, &device->flags);
919         atomic_set(&device->ap_in_flight, 0);
920         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
921         return err;
922 }
923
924 /*
925  * return values:
926  *   1 yes, we have a valid connection
927  *   0 oops, did not work out, please try again
928  *  -1 peer talks different language,
929  *     no point in trying again, please go standalone.
930  *  -2 We do not have a network config...
931  */
932 static int conn_connect(struct drbd_connection *connection)
933 {
934         struct drbd_socket sock, msock;
935         struct drbd_peer_device *peer_device;
936         struct net_conf *nc;
937         int vnr, timeout, h;
938         bool discard_my_data, ok;
939         enum drbd_state_rv rv;
940         struct accept_wait_data ad = {
941                 .connection = connection,
942                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
943         };
944
945         clear_bit(DISCONNECT_SENT, &connection->flags);
946         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
947                 return -2;
948
949         mutex_init(&sock.mutex);
950         sock.sbuf = connection->data.sbuf;
951         sock.rbuf = connection->data.rbuf;
952         sock.socket = NULL;
953         mutex_init(&msock.mutex);
954         msock.sbuf = connection->meta.sbuf;
955         msock.rbuf = connection->meta.rbuf;
956         msock.socket = NULL;
957
958         /* Assume that the peer only understands protocol 80 until we know better.  */
959         connection->agreed_pro_version = 80;
960
961         if (prepare_listen_socket(connection, &ad))
962                 return 0;
963
964         do {
965                 struct socket *s;
966
967                 s = drbd_try_connect(connection);
968                 if (s) {
969                         if (!sock.socket) {
970                                 sock.socket = s;
971                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
972                         } else if (!msock.socket) {
973                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
974                                 msock.socket = s;
975                                 send_first_packet(connection, &msock, P_INITIAL_META);
976                         } else {
977                                 drbd_err(connection, "Logic error in conn_connect()\n");
978                                 goto out_release_sockets;
979                         }
980                 }
981
982                 if (connection_established(connection, &sock.socket, &msock.socket))
983                         break;
984
985 retry:
986                 s = drbd_wait_for_connect(connection, &ad);
987                 if (s) {
988                         int fp = receive_first_packet(connection, s);
989                         drbd_socket_okay(&sock.socket);
990                         drbd_socket_okay(&msock.socket);
991                         switch (fp) {
992                         case P_INITIAL_DATA:
993                                 if (sock.socket) {
994                                         drbd_warn(connection, "initial packet S crossed\n");
995                                         sock_release(sock.socket);
996                                         sock.socket = s;
997                                         goto randomize;
998                                 }
999                                 sock.socket = s;
1000                                 break;
1001                         case P_INITIAL_META:
1002                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
1003                                 if (msock.socket) {
1004                                         drbd_warn(connection, "initial packet M crossed\n");
1005                                         sock_release(msock.socket);
1006                                         msock.socket = s;
1007                                         goto randomize;
1008                                 }
1009                                 msock.socket = s;
1010                                 break;
1011                         default:
1012                                 drbd_warn(connection, "Error receiving initial packet\n");
1013                                 sock_release(s);
1014 randomize:
1015                                 if (prandom_u32() & 1)
1016                                         goto retry;
1017                         }
1018                 }
1019
1020                 if (connection->cstate <= C_DISCONNECTING)
1021                         goto out_release_sockets;
1022                 if (signal_pending(current)) {
1023                         flush_signals(current);
1024                         smp_rmb();
1025                         if (get_t_state(&connection->receiver) == EXITING)
1026                                 goto out_release_sockets;
1027                 }
1028
1029                 ok = connection_established(connection, &sock.socket, &msock.socket);
1030         } while (!ok);
1031
1032         if (ad.s_listen)
1033                 sock_release(ad.s_listen);
1034
1035         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1036         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1037
1038         sock.socket->sk->sk_allocation = GFP_NOIO;
1039         msock.socket->sk->sk_allocation = GFP_NOIO;
1040
1041         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1042         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1043
1044         /* NOT YET ...
1045          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1046          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1047          * first set it to the P_CONNECTION_FEATURES timeout,
1048          * which we set to 4x the configured ping_timeout. */
1049         rcu_read_lock();
1050         nc = rcu_dereference(connection->net_conf);
1051
1052         sock.socket->sk->sk_sndtimeo =
1053         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1054
1055         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1056         timeout = nc->timeout * HZ / 10;
1057         discard_my_data = nc->discard_my_data;
1058         rcu_read_unlock();
1059
1060         msock.socket->sk->sk_sndtimeo = timeout;
1061
1062         /* we don't want delays.
1063          * we use TCP_CORK where appropriate, though */
1064         drbd_tcp_nodelay(sock.socket);
1065         drbd_tcp_nodelay(msock.socket);
1066
1067         connection->data.socket = sock.socket;
1068         connection->meta.socket = msock.socket;
1069         connection->last_received = jiffies;
1070
1071         h = drbd_do_features(connection);
1072         if (h <= 0)
1073                 return h;
1074
1075         if (connection->cram_hmac_tfm) {
1076                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1077                 switch (drbd_do_auth(connection)) {
1078                 case -1:
1079                         drbd_err(connection, "Authentication of peer failed\n");
1080                         return -1;
1081                 case 0:
1082                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1083                         return 0;
1084                 }
1085         }
1086
1087         connection->data.socket->sk->sk_sndtimeo = timeout;
1088         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1089
1090         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1091                 return -1;
1092
1093         /* Prevent a race between resync-handshake and
1094          * being promoted to Primary.
1095          *
1096          * Grab and release the state mutex, so we know that any current
1097          * drbd_set_role() is finished, and any incoming drbd_set_role
1098          * will see the STATE_SENT flag, and wait for it to be cleared.
1099          */
1100         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101                 mutex_lock(peer_device->device->state_mutex);
1102
1103         set_bit(STATE_SENT, &connection->flags);
1104
1105         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1106                 mutex_unlock(peer_device->device->state_mutex);
1107
1108         rcu_read_lock();
1109         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1110                 struct drbd_device *device = peer_device->device;
1111                 kref_get(&device->kref);
1112                 rcu_read_unlock();
1113
1114                 if (discard_my_data)
1115                         set_bit(DISCARD_MY_DATA, &device->flags);
1116                 else
1117                         clear_bit(DISCARD_MY_DATA, &device->flags);
1118
1119                 drbd_connected(peer_device);
1120                 kref_put(&device->kref, drbd_destroy_device);
1121                 rcu_read_lock();
1122         }
1123         rcu_read_unlock();
1124
1125         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1126         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1127                 clear_bit(STATE_SENT, &connection->flags);
1128                 return 0;
1129         }
1130
1131         drbd_thread_start(&connection->ack_receiver);
1132         /* opencoded create_singlethread_workqueue(),
1133          * to be able to use format string arguments */
1134         connection->ack_sender =
1135                 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1136         if (!connection->ack_sender) {
1137                 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1138                 return 0;
1139         }
1140
1141         mutex_lock(&connection->resource->conf_update);
1142         /* The discard_my_data flag is a single-shot modifier to the next
1143          * connection attempt, the handshake of which is now well underway.
1144          * No need for rcu style copying of the whole struct
1145          * just to clear a single value. */
1146         connection->net_conf->discard_my_data = 0;
1147         mutex_unlock(&connection->resource->conf_update);
1148
1149         return h;
1150
1151 out_release_sockets:
1152         if (ad.s_listen)
1153                 sock_release(ad.s_listen);
1154         if (sock.socket)
1155                 sock_release(sock.socket);
1156         if (msock.socket)
1157                 sock_release(msock.socket);
1158         return -1;
1159 }
1160
1161 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1162 {
1163         unsigned int header_size = drbd_header_size(connection);
1164
1165         if (header_size == sizeof(struct p_header100) &&
1166             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1167                 struct p_header100 *h = header;
1168                 if (h->pad != 0) {
1169                         drbd_err(connection, "Header padding is not zero\n");
1170                         return -EINVAL;
1171                 }
1172                 pi->vnr = be16_to_cpu(h->volume);
1173                 pi->cmd = be16_to_cpu(h->command);
1174                 pi->size = be32_to_cpu(h->length);
1175         } else if (header_size == sizeof(struct p_header95) &&
1176                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1177                 struct p_header95 *h = header;
1178                 pi->cmd = be16_to_cpu(h->command);
1179                 pi->size = be32_to_cpu(h->length);
1180                 pi->vnr = 0;
1181         } else if (header_size == sizeof(struct p_header80) &&
1182                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1183                 struct p_header80 *h = header;
1184                 pi->cmd = be16_to_cpu(h->command);
1185                 pi->size = be16_to_cpu(h->length);
1186                 pi->vnr = 0;
1187         } else {
1188                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1189                          be32_to_cpu(*(__be32 *)header),
1190                          connection->agreed_pro_version);
1191                 return -EINVAL;
1192         }
1193         pi->data = header + header_size;
1194         return 0;
1195 }
1196
1197 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1198 {
1199         void *buffer = connection->data.rbuf;
1200         int err;
1201
1202         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1203         if (err)
1204                 return err;
1205
1206         err = decode_header(connection, buffer, pi);
1207         connection->last_received = jiffies;
1208
1209         return err;
1210 }
1211
1212 /* This is blkdev_issue_flush, but asynchronous.
1213  * We want to submit to all component volumes in parallel,
1214  * then wait for all completions.
1215  */
1216 struct issue_flush_context {
1217         atomic_t pending;
1218         int error;
1219         struct completion done;
1220 };
1221 struct one_flush_context {
1222         struct drbd_device *device;
1223         struct issue_flush_context *ctx;
1224 };
1225
1226 void one_flush_endio(struct bio *bio)
1227 {
1228         struct one_flush_context *octx = bio->bi_private;
1229         struct drbd_device *device = octx->device;
1230         struct issue_flush_context *ctx = octx->ctx;
1231
1232         if (bio->bi_status) {
1233                 ctx->error = blk_status_to_errno(bio->bi_status);
1234                 drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1235         }
1236         kfree(octx);
1237         bio_put(bio);
1238
1239         clear_bit(FLUSH_PENDING, &device->flags);
1240         put_ldev(device);
1241         kref_put(&device->kref, drbd_destroy_device);
1242
1243         if (atomic_dec_and_test(&ctx->pending))
1244                 complete(&ctx->done);
1245 }
1246
1247 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1248 {
1249         struct bio *bio = bio_alloc(GFP_NOIO, 0);
1250         struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1251         if (!bio || !octx) {
1252                 drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1253                 /* FIXME: what else can I do now?  disconnecting or detaching
1254                  * really does not help to improve the state of the world, either.
1255                  */
1256                 kfree(octx);
1257                 if (bio)
1258                         bio_put(bio);
1259
1260                 ctx->error = -ENOMEM;
1261                 put_ldev(device);
1262                 kref_put(&device->kref, drbd_destroy_device);
1263                 return;
1264         }
1265
1266         octx->device = device;
1267         octx->ctx = ctx;
1268         bio->bi_bdev = device->ldev->backing_bdev;
1269         bio->bi_private = octx;
1270         bio->bi_end_io = one_flush_endio;
1271         bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1272
1273         device->flush_jif = jiffies;
1274         set_bit(FLUSH_PENDING, &device->flags);
1275         atomic_inc(&ctx->pending);
1276         submit_bio(bio);
1277 }
1278
1279 static void drbd_flush(struct drbd_connection *connection)
1280 {
1281         if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1282                 struct drbd_peer_device *peer_device;
1283                 struct issue_flush_context ctx;
1284                 int vnr;
1285
1286                 atomic_set(&ctx.pending, 1);
1287                 ctx.error = 0;
1288                 init_completion(&ctx.done);
1289
1290                 rcu_read_lock();
1291                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1292                         struct drbd_device *device = peer_device->device;
1293
1294                         if (!get_ldev(device))
1295                                 continue;
1296                         kref_get(&device->kref);
1297                         rcu_read_unlock();
1298
1299                         submit_one_flush(device, &ctx);
1300
1301                         rcu_read_lock();
1302                 }
1303                 rcu_read_unlock();
1304
1305                 /* Do we want to add a timeout,
1306                  * if disk-timeout is set? */
1307                 if (!atomic_dec_and_test(&ctx.pending))
1308                         wait_for_completion(&ctx.done);
1309
1310                 if (ctx.error) {
1311                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1312                          * don't try again for ANY return value != 0
1313                          * if (rv == -EOPNOTSUPP) */
1314                         /* Any error is already reported by bio_endio callback. */
1315                         drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1316                 }
1317         }
1318 }
1319
1320 /**
1321  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1322  * @device:     DRBD device.
1323  * @epoch:      Epoch object.
1324  * @ev:         Epoch event.
1325  */
1326 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1327                                                struct drbd_epoch *epoch,
1328                                                enum epoch_event ev)
1329 {
1330         int epoch_size;
1331         struct drbd_epoch *next_epoch;
1332         enum finish_epoch rv = FE_STILL_LIVE;
1333
1334         spin_lock(&connection->epoch_lock);
1335         do {
1336                 next_epoch = NULL;
1337
1338                 epoch_size = atomic_read(&epoch->epoch_size);
1339
1340                 switch (ev & ~EV_CLEANUP) {
1341                 case EV_PUT:
1342                         atomic_dec(&epoch->active);
1343                         break;
1344                 case EV_GOT_BARRIER_NR:
1345                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1346                         break;
1347                 case EV_BECAME_LAST:
1348                         /* nothing to do*/
1349                         break;
1350                 }
1351
1352                 if (epoch_size != 0 &&
1353                     atomic_read(&epoch->active) == 0 &&
1354                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1355                         if (!(ev & EV_CLEANUP)) {
1356                                 spin_unlock(&connection->epoch_lock);
1357                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1358                                 spin_lock(&connection->epoch_lock);
1359                         }
1360 #if 0
1361                         /* FIXME: dec unacked on connection, once we have
1362                          * something to count pending connection packets in. */
1363                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1364                                 dec_unacked(epoch->connection);
1365 #endif
1366
1367                         if (connection->current_epoch != epoch) {
1368                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1369                                 list_del(&epoch->list);
1370                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1371                                 connection->epochs--;
1372                                 kfree(epoch);
1373
1374                                 if (rv == FE_STILL_LIVE)
1375                                         rv = FE_DESTROYED;
1376                         } else {
1377                                 epoch->flags = 0;
1378                                 atomic_set(&epoch->epoch_size, 0);
1379                                 /* atomic_set(&epoch->active, 0); is already zero */
1380                                 if (rv == FE_STILL_LIVE)
1381                                         rv = FE_RECYCLED;
1382                         }
1383                 }
1384
1385                 if (!next_epoch)
1386                         break;
1387
1388                 epoch = next_epoch;
1389         } while (1);
1390
1391         spin_unlock(&connection->epoch_lock);
1392
1393         return rv;
1394 }
1395
1396 static enum write_ordering_e
1397 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1398 {
1399         struct disk_conf *dc;
1400
1401         dc = rcu_dereference(bdev->disk_conf);
1402
1403         if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1404                 wo = WO_DRAIN_IO;
1405         if (wo == WO_DRAIN_IO && !dc->disk_drain)
1406                 wo = WO_NONE;
1407
1408         return wo;
1409 }
1410
1411 /**
1412  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1413  * @connection: DRBD connection.
1414  * @wo:         Write ordering method to try.
1415  */
1416 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1417                               enum write_ordering_e wo)
1418 {
1419         struct drbd_device *device;
1420         enum write_ordering_e pwo;
1421         int vnr;
1422         static char *write_ordering_str[] = {
1423                 [WO_NONE] = "none",
1424                 [WO_DRAIN_IO] = "drain",
1425                 [WO_BDEV_FLUSH] = "flush",
1426         };
1427
1428         pwo = resource->write_ordering;
1429         if (wo != WO_BDEV_FLUSH)
1430                 wo = min(pwo, wo);
1431         rcu_read_lock();
1432         idr_for_each_entry(&resource->devices, device, vnr) {
1433                 if (get_ldev(device)) {
1434                         wo = max_allowed_wo(device->ldev, wo);
1435                         if (device->ldev == bdev)
1436                                 bdev = NULL;
1437                         put_ldev(device);
1438                 }
1439         }
1440
1441         if (bdev)
1442                 wo = max_allowed_wo(bdev, wo);
1443
1444         rcu_read_unlock();
1445
1446         resource->write_ordering = wo;
1447         if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1448                 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1449 }
1450
1451 static void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req)
1452 {
1453         struct block_device *bdev = device->ldev->backing_bdev;
1454
1455         if (blkdev_issue_zeroout(bdev, peer_req->i.sector, peer_req->i.size >> 9,
1456                         GFP_NOIO, 0))
1457                 peer_req->flags |= EE_WAS_ERROR;
1458
1459         drbd_endio_write_sec_final(peer_req);
1460 }
1461
1462 static void drbd_issue_peer_wsame(struct drbd_device *device,
1463                                   struct drbd_peer_request *peer_req)
1464 {
1465         struct block_device *bdev = device->ldev->backing_bdev;
1466         sector_t s = peer_req->i.sector;
1467         sector_t nr = peer_req->i.size >> 9;
1468         if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1469                 peer_req->flags |= EE_WAS_ERROR;
1470         drbd_endio_write_sec_final(peer_req);
1471 }
1472
1473
1474 /**
1475  * drbd_submit_peer_request()
1476  * @device:     DRBD device.
1477  * @peer_req:   peer request
1478  * @rw:         flag field, see bio->bi_opf
1479  *
1480  * May spread the pages to multiple bios,
1481  * depending on bio_add_page restrictions.
1482  *
1483  * Returns 0 if all bios have been submitted,
1484  * -ENOMEM if we could not allocate enough bios,
1485  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1486  *  single page to an empty bio (which should never happen and likely indicates
1487  *  that the lower level IO stack is in some way broken). This has been observed
1488  *  on certain Xen deployments.
1489  */
1490 /* TODO allocate from our own bio_set. */
1491 int drbd_submit_peer_request(struct drbd_device *device,
1492                              struct drbd_peer_request *peer_req,
1493                              const unsigned op, const unsigned op_flags,
1494                              const int fault_type)
1495 {
1496         struct bio *bios = NULL;
1497         struct bio *bio;
1498         struct page *page = peer_req->pages;
1499         sector_t sector = peer_req->i.sector;
1500         unsigned data_size = peer_req->i.size;
1501         unsigned n_bios = 0;
1502         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1503         int err = -ENOMEM;
1504
1505         /* TRIM/DISCARD: for now, always use the helper function
1506          * blkdev_issue_zeroout(..., discard=true).
1507          * It's synchronous, but it does the right thing wrt. bio splitting.
1508          * Correctness first, performance later.  Next step is to code an
1509          * asynchronous variant of the same.
1510          */
1511         if (peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) {
1512                 /* wait for all pending IO completions, before we start
1513                  * zeroing things out. */
1514                 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1515                 /* add it to the active list now,
1516                  * so we can find it to present it in debugfs */
1517                 peer_req->submit_jif = jiffies;
1518                 peer_req->flags |= EE_SUBMITTED;
1519
1520                 /* If this was a resync request from receive_rs_deallocated(),
1521                  * it is already on the sync_ee list */
1522                 if (list_empty(&peer_req->w.list)) {
1523                         spin_lock_irq(&device->resource->req_lock);
1524                         list_add_tail(&peer_req->w.list, &device->active_ee);
1525                         spin_unlock_irq(&device->resource->req_lock);
1526                 }
1527
1528                 if (peer_req->flags & EE_IS_TRIM)
1529                         drbd_issue_peer_discard(device, peer_req);
1530                 else /* EE_WRITE_SAME */
1531                         drbd_issue_peer_wsame(device, peer_req);
1532                 return 0;
1533         }
1534
1535         /* In most cases, we will only need one bio.  But in case the lower
1536          * level restrictions happen to be different at this offset on this
1537          * side than those of the sending peer, we may need to submit the
1538          * request in more than one bio.
1539          *
1540          * Plain bio_alloc is good enough here, this is no DRBD internally
1541          * generated bio, but a bio allocated on behalf of the peer.
1542          */
1543 next_bio:
1544         bio = bio_alloc(GFP_NOIO, nr_pages);
1545         if (!bio) {
1546                 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1547                 goto fail;
1548         }
1549         /* > peer_req->i.sector, unless this is the first bio */
1550         bio->bi_iter.bi_sector = sector;
1551         bio->bi_bdev = device->ldev->backing_bdev;
1552         bio_set_op_attrs(bio, op, op_flags);
1553         bio->bi_private = peer_req;
1554         bio->bi_end_io = drbd_peer_request_endio;
1555
1556         bio->bi_next = bios;
1557         bios = bio;
1558         ++n_bios;
1559
1560         page_chain_for_each(page) {
1561                 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1562                 if (!bio_add_page(bio, page, len, 0))
1563                         goto next_bio;
1564                 data_size -= len;
1565                 sector += len >> 9;
1566                 --nr_pages;
1567         }
1568         D_ASSERT(device, data_size == 0);
1569         D_ASSERT(device, page == NULL);
1570
1571         atomic_set(&peer_req->pending_bios, n_bios);
1572         /* for debugfs: update timestamp, mark as submitted */
1573         peer_req->submit_jif = jiffies;
1574         peer_req->flags |= EE_SUBMITTED;
1575         do {
1576                 bio = bios;
1577                 bios = bios->bi_next;
1578                 bio->bi_next = NULL;
1579
1580                 drbd_generic_make_request(device, fault_type, bio);
1581         } while (bios);
1582         return 0;
1583
1584 fail:
1585         while (bios) {
1586                 bio = bios;
1587                 bios = bios->bi_next;
1588                 bio_put(bio);
1589         }
1590         return err;
1591 }
1592
1593 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1594                                              struct drbd_peer_request *peer_req)
1595 {
1596         struct drbd_interval *i = &peer_req->i;
1597
1598         drbd_remove_interval(&device->write_requests, i);
1599         drbd_clear_interval(i);
1600
1601         /* Wake up any processes waiting for this peer request to complete.  */
1602         if (i->waiting)
1603                 wake_up(&device->misc_wait);
1604 }
1605
1606 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1607 {
1608         struct drbd_peer_device *peer_device;
1609         int vnr;
1610
1611         rcu_read_lock();
1612         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1613                 struct drbd_device *device = peer_device->device;
1614
1615                 kref_get(&device->kref);
1616                 rcu_read_unlock();
1617                 drbd_wait_ee_list_empty(device, &device->active_ee);
1618                 kref_put(&device->kref, drbd_destroy_device);
1619                 rcu_read_lock();
1620         }
1621         rcu_read_unlock();
1622 }
1623
1624 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1625 {
1626         int rv;
1627         struct p_barrier *p = pi->data;
1628         struct drbd_epoch *epoch;
1629
1630         /* FIXME these are unacked on connection,
1631          * not a specific (peer)device.
1632          */
1633         connection->current_epoch->barrier_nr = p->barrier;
1634         connection->current_epoch->connection = connection;
1635         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1636
1637         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1638          * the activity log, which means it would not be resynced in case the
1639          * R_PRIMARY crashes now.
1640          * Therefore we must send the barrier_ack after the barrier request was
1641          * completed. */
1642         switch (connection->resource->write_ordering) {
1643         case WO_NONE:
1644                 if (rv == FE_RECYCLED)
1645                         return 0;
1646
1647                 /* receiver context, in the writeout path of the other node.
1648                  * avoid potential distributed deadlock */
1649                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1650                 if (epoch)
1651                         break;
1652                 else
1653                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1654                         /* Fall through */
1655
1656         case WO_BDEV_FLUSH:
1657         case WO_DRAIN_IO:
1658                 conn_wait_active_ee_empty(connection);
1659                 drbd_flush(connection);
1660
1661                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1662                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1663                         if (epoch)
1664                                 break;
1665                 }
1666
1667                 return 0;
1668         default:
1669                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1670                          connection->resource->write_ordering);
1671                 return -EIO;
1672         }
1673
1674         epoch->flags = 0;
1675         atomic_set(&epoch->epoch_size, 0);
1676         atomic_set(&epoch->active, 0);
1677
1678         spin_lock(&connection->epoch_lock);
1679         if (atomic_read(&connection->current_epoch->epoch_size)) {
1680                 list_add(&epoch->list, &connection->current_epoch->list);
1681                 connection->current_epoch = epoch;
1682                 connection->epochs++;
1683         } else {
1684                 /* The current_epoch got recycled while we allocated this one... */
1685                 kfree(epoch);
1686         }
1687         spin_unlock(&connection->epoch_lock);
1688
1689         return 0;
1690 }
1691
1692 /* quick wrapper in case payload size != request_size (write same) */
1693 static void drbd_csum_ee_size(struct crypto_ahash *h,
1694                               struct drbd_peer_request *r, void *d,
1695                               unsigned int payload_size)
1696 {
1697         unsigned int tmp = r->i.size;
1698         r->i.size = payload_size;
1699         drbd_csum_ee(h, r, d);
1700         r->i.size = tmp;
1701 }
1702
1703 /* used from receive_RSDataReply (recv_resync_read)
1704  * and from receive_Data.
1705  * data_size: actual payload ("data in")
1706  *      for normal writes that is bi_size.
1707  *      for discards, that is zero.
1708  *      for write same, it is logical_block_size.
1709  * both trim and write same have the bi_size ("data len to be affected")
1710  * as extra argument in the packet header.
1711  */
1712 static struct drbd_peer_request *
1713 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1714               struct packet_info *pi) __must_hold(local)
1715 {
1716         struct drbd_device *device = peer_device->device;
1717         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1718         struct drbd_peer_request *peer_req;
1719         struct page *page;
1720         int digest_size, err;
1721         unsigned int data_size = pi->size, ds;
1722         void *dig_in = peer_device->connection->int_dig_in;
1723         void *dig_vv = peer_device->connection->int_dig_vv;
1724         unsigned long *data;
1725         struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1726         struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1727
1728         digest_size = 0;
1729         if (!trim && peer_device->connection->peer_integrity_tfm) {
1730                 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1731                 /*
1732                  * FIXME: Receive the incoming digest into the receive buffer
1733                  *        here, together with its struct p_data?
1734                  */
1735                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1736                 if (err)
1737                         return NULL;
1738                 data_size -= digest_size;
1739         }
1740
1741         /* assume request_size == data_size, but special case trim and wsame. */
1742         ds = data_size;
1743         if (trim) {
1744                 if (!expect(data_size == 0))
1745                         return NULL;
1746                 ds = be32_to_cpu(trim->size);
1747         } else if (wsame) {
1748                 if (data_size != queue_logical_block_size(device->rq_queue)) {
1749                         drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1750                                 data_size, queue_logical_block_size(device->rq_queue));
1751                         return NULL;
1752                 }
1753                 if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1754                         drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1755                                 data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1756                         return NULL;
1757                 }
1758                 ds = be32_to_cpu(wsame->size);
1759         }
1760
1761         if (!expect(IS_ALIGNED(ds, 512)))
1762                 return NULL;
1763         if (trim || wsame) {
1764                 if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1765                         return NULL;
1766         } else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1767                 return NULL;
1768
1769         /* even though we trust out peer,
1770          * we sometimes have to double check. */
1771         if (sector + (ds>>9) > capacity) {
1772                 drbd_err(device, "request from peer beyond end of local disk: "
1773                         "capacity: %llus < sector: %llus + size: %u\n",
1774                         (unsigned long long)capacity,
1775                         (unsigned long long)sector, ds);
1776                 return NULL;
1777         }
1778
1779         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1780          * "criss-cross" setup, that might cause write-out on some other DRBD,
1781          * which in turn might block on the other node at this very place.  */
1782         peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1783         if (!peer_req)
1784                 return NULL;
1785
1786         peer_req->flags |= EE_WRITE;
1787         if (trim) {
1788                 peer_req->flags |= EE_IS_TRIM;
1789                 return peer_req;
1790         }
1791         if (wsame)
1792                 peer_req->flags |= EE_WRITE_SAME;
1793
1794         /* receive payload size bytes into page chain */
1795         ds = data_size;
1796         page = peer_req->pages;
1797         page_chain_for_each(page) {
1798                 unsigned len = min_t(int, ds, PAGE_SIZE);
1799                 data = kmap(page);
1800                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1801                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1802                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1803                         data[0] = data[0] ^ (unsigned long)-1;
1804                 }
1805                 kunmap(page);
1806                 if (err) {
1807                         drbd_free_peer_req(device, peer_req);
1808                         return NULL;
1809                 }
1810                 ds -= len;
1811         }
1812
1813         if (digest_size) {
1814                 drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1815                 if (memcmp(dig_in, dig_vv, digest_size)) {
1816                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1817                                 (unsigned long long)sector, data_size);
1818                         drbd_free_peer_req(device, peer_req);
1819                         return NULL;
1820                 }
1821         }
1822         device->recv_cnt += data_size >> 9;
1823         return peer_req;
1824 }
1825
1826 /* drbd_drain_block() just takes a data block
1827  * out of the socket input buffer, and discards it.
1828  */
1829 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1830 {
1831         struct page *page;
1832         int err = 0;
1833         void *data;
1834
1835         if (!data_size)
1836                 return 0;
1837
1838         page = drbd_alloc_pages(peer_device, 1, 1);
1839
1840         data = kmap(page);
1841         while (data_size) {
1842                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1843
1844                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1845                 if (err)
1846                         break;
1847                 data_size -= len;
1848         }
1849         kunmap(page);
1850         drbd_free_pages(peer_device->device, page, 0);
1851         return err;
1852 }
1853
1854 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1855                            sector_t sector, int data_size)
1856 {
1857         struct bio_vec bvec;
1858         struct bvec_iter iter;
1859         struct bio *bio;
1860         int digest_size, err, expect;
1861         void *dig_in = peer_device->connection->int_dig_in;
1862         void *dig_vv = peer_device->connection->int_dig_vv;
1863
1864         digest_size = 0;
1865         if (peer_device->connection->peer_integrity_tfm) {
1866                 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1867                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1868                 if (err)
1869                         return err;
1870                 data_size -= digest_size;
1871         }
1872
1873         /* optimistically update recv_cnt.  if receiving fails below,
1874          * we disconnect anyways, and counters will be reset. */
1875         peer_device->device->recv_cnt += data_size>>9;
1876
1877         bio = req->master_bio;
1878         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1879
1880         bio_for_each_segment(bvec, bio, iter) {
1881                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1882                 expect = min_t(int, data_size, bvec.bv_len);
1883                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1884                 kunmap(bvec.bv_page);
1885                 if (err)
1886                         return err;
1887                 data_size -= expect;
1888         }
1889
1890         if (digest_size) {
1891                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1892                 if (memcmp(dig_in, dig_vv, digest_size)) {
1893                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1894                         return -EINVAL;
1895                 }
1896         }
1897
1898         D_ASSERT(peer_device->device, data_size == 0);
1899         return 0;
1900 }
1901
1902 /*
1903  * e_end_resync_block() is called in ack_sender context via
1904  * drbd_finish_peer_reqs().
1905  */
1906 static int e_end_resync_block(struct drbd_work *w, int unused)
1907 {
1908         struct drbd_peer_request *peer_req =
1909                 container_of(w, struct drbd_peer_request, w);
1910         struct drbd_peer_device *peer_device = peer_req->peer_device;
1911         struct drbd_device *device = peer_device->device;
1912         sector_t sector = peer_req->i.sector;
1913         int err;
1914
1915         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1916
1917         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1918                 drbd_set_in_sync(device, sector, peer_req->i.size);
1919                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1920         } else {
1921                 /* Record failure to sync */
1922                 drbd_rs_failed_io(device, sector, peer_req->i.size);
1923
1924                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1925         }
1926         dec_unacked(device);
1927
1928         return err;
1929 }
1930
1931 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1932                             struct packet_info *pi) __releases(local)
1933 {
1934         struct drbd_device *device = peer_device->device;
1935         struct drbd_peer_request *peer_req;
1936
1937         peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1938         if (!peer_req)
1939                 goto fail;
1940
1941         dec_rs_pending(device);
1942
1943         inc_unacked(device);
1944         /* corresponding dec_unacked() in e_end_resync_block()
1945          * respective _drbd_clear_done_ee */
1946
1947         peer_req->w.cb = e_end_resync_block;
1948         peer_req->submit_jif = jiffies;
1949
1950         spin_lock_irq(&device->resource->req_lock);
1951         list_add_tail(&peer_req->w.list, &device->sync_ee);
1952         spin_unlock_irq(&device->resource->req_lock);
1953
1954         atomic_add(pi->size >> 9, &device->rs_sect_ev);
1955         if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
1956                                      DRBD_FAULT_RS_WR) == 0)
1957                 return 0;
1958
1959         /* don't care for the reason here */
1960         drbd_err(device, "submit failed, triggering re-connect\n");
1961         spin_lock_irq(&device->resource->req_lock);
1962         list_del(&peer_req->w.list);
1963         spin_unlock_irq(&device->resource->req_lock);
1964
1965         drbd_free_peer_req(device, peer_req);
1966 fail:
1967         put_ldev(device);
1968         return -EIO;
1969 }
1970
1971 static struct drbd_request *
1972 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1973              sector_t sector, bool missing_ok, const char *func)
1974 {
1975         struct drbd_request *req;
1976
1977         /* Request object according to our peer */
1978         req = (struct drbd_request *)(unsigned long)id;
1979         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1980                 return req;
1981         if (!missing_ok) {
1982                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1983                         (unsigned long)id, (unsigned long long)sector);
1984         }
1985         return NULL;
1986 }
1987
1988 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1989 {
1990         struct drbd_peer_device *peer_device;
1991         struct drbd_device *device;
1992         struct drbd_request *req;
1993         sector_t sector;
1994         int err;
1995         struct p_data *p = pi->data;
1996
1997         peer_device = conn_peer_device(connection, pi->vnr);
1998         if (!peer_device)
1999                 return -EIO;
2000         device = peer_device->device;
2001
2002         sector = be64_to_cpu(p->sector);
2003
2004         spin_lock_irq(&device->resource->req_lock);
2005         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2006         spin_unlock_irq(&device->resource->req_lock);
2007         if (unlikely(!req))
2008                 return -EIO;
2009
2010         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2011          * special casing it there for the various failure cases.
2012          * still no race with drbd_fail_pending_reads */
2013         err = recv_dless_read(peer_device, req, sector, pi->size);
2014         if (!err)
2015                 req_mod(req, DATA_RECEIVED);
2016         /* else: nothing. handled from drbd_disconnect...
2017          * I don't think we may complete this just yet
2018          * in case we are "on-disconnect: freeze" */
2019
2020         return err;
2021 }
2022
2023 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2024 {
2025         struct drbd_peer_device *peer_device;
2026         struct drbd_device *device;
2027         sector_t sector;
2028         int err;
2029         struct p_data *p = pi->data;
2030
2031         peer_device = conn_peer_device(connection, pi->vnr);
2032         if (!peer_device)
2033                 return -EIO;
2034         device = peer_device->device;
2035
2036         sector = be64_to_cpu(p->sector);
2037         D_ASSERT(device, p->block_id == ID_SYNCER);
2038
2039         if (get_ldev(device)) {
2040                 /* data is submitted to disk within recv_resync_read.
2041                  * corresponding put_ldev done below on error,
2042                  * or in drbd_peer_request_endio. */
2043                 err = recv_resync_read(peer_device, sector, pi);
2044         } else {
2045                 if (__ratelimit(&drbd_ratelimit_state))
2046                         drbd_err(device, "Can not write resync data to local disk.\n");
2047
2048                 err = drbd_drain_block(peer_device, pi->size);
2049
2050                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2051         }
2052
2053         atomic_add(pi->size >> 9, &device->rs_sect_in);
2054
2055         return err;
2056 }
2057
2058 static void restart_conflicting_writes(struct drbd_device *device,
2059                                        sector_t sector, int size)
2060 {
2061         struct drbd_interval *i;
2062         struct drbd_request *req;
2063
2064         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2065                 if (!i->local)
2066                         continue;
2067                 req = container_of(i, struct drbd_request, i);
2068                 if (req->rq_state & RQ_LOCAL_PENDING ||
2069                     !(req->rq_state & RQ_POSTPONED))
2070                         continue;
2071                 /* as it is RQ_POSTPONED, this will cause it to
2072                  * be queued on the retry workqueue. */
2073                 __req_mod(req, CONFLICT_RESOLVED, NULL);
2074         }
2075 }
2076
2077 /*
2078  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2079  */
2080 static int e_end_block(struct drbd_work *w, int cancel)
2081 {
2082         struct drbd_peer_request *peer_req =
2083                 container_of(w, struct drbd_peer_request, w);
2084         struct drbd_peer_device *peer_device = peer_req->peer_device;
2085         struct drbd_device *device = peer_device->device;
2086         sector_t sector = peer_req->i.sector;
2087         int err = 0, pcmd;
2088
2089         if (peer_req->flags & EE_SEND_WRITE_ACK) {
2090                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2091                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2092                                 device->state.conn <= C_PAUSED_SYNC_T &&
2093                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2094                                 P_RS_WRITE_ACK : P_WRITE_ACK;
2095                         err = drbd_send_ack(peer_device, pcmd, peer_req);
2096                         if (pcmd == P_RS_WRITE_ACK)
2097                                 drbd_set_in_sync(device, sector, peer_req->i.size);
2098                 } else {
2099                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2100                         /* we expect it to be marked out of sync anyways...
2101                          * maybe assert this?  */
2102                 }
2103                 dec_unacked(device);
2104         }
2105
2106         /* we delete from the conflict detection hash _after_ we sent out the
2107          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2108         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2109                 spin_lock_irq(&device->resource->req_lock);
2110                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2111                 drbd_remove_epoch_entry_interval(device, peer_req);
2112                 if (peer_req->flags & EE_RESTART_REQUESTS)
2113                         restart_conflicting_writes(device, sector, peer_req->i.size);
2114                 spin_unlock_irq(&device->resource->req_lock);
2115         } else
2116                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2117
2118         drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2119
2120         return err;
2121 }
2122
2123 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2124 {
2125         struct drbd_peer_request *peer_req =
2126                 container_of(w, struct drbd_peer_request, w);
2127         struct drbd_peer_device *peer_device = peer_req->peer_device;
2128         int err;
2129
2130         err = drbd_send_ack(peer_device, ack, peer_req);
2131         dec_unacked(peer_device->device);
2132
2133         return err;
2134 }
2135
2136 static int e_send_superseded(struct drbd_work *w, int unused)
2137 {
2138         return e_send_ack(w, P_SUPERSEDED);
2139 }
2140
2141 static int e_send_retry_write(struct drbd_work *w, int unused)
2142 {
2143         struct drbd_peer_request *peer_req =
2144                 container_of(w, struct drbd_peer_request, w);
2145         struct drbd_connection *connection = peer_req->peer_device->connection;
2146
2147         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2148                              P_RETRY_WRITE : P_SUPERSEDED);
2149 }
2150
2151 static bool seq_greater(u32 a, u32 b)
2152 {
2153         /*
2154          * We assume 32-bit wrap-around here.
2155          * For 24-bit wrap-around, we would have to shift:
2156          *  a <<= 8; b <<= 8;
2157          */
2158         return (s32)a - (s32)b > 0;
2159 }
2160
2161 static u32 seq_max(u32 a, u32 b)
2162 {
2163         return seq_greater(a, b) ? a : b;
2164 }
2165
2166 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2167 {
2168         struct drbd_device *device = peer_device->device;
2169         unsigned int newest_peer_seq;
2170
2171         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2172                 spin_lock(&device->peer_seq_lock);
2173                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2174                 device->peer_seq = newest_peer_seq;
2175                 spin_unlock(&device->peer_seq_lock);
2176                 /* wake up only if we actually changed device->peer_seq */
2177                 if (peer_seq == newest_peer_seq)
2178                         wake_up(&device->seq_wait);
2179         }
2180 }
2181
2182 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2183 {
2184         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2185 }
2186
2187 /* maybe change sync_ee into interval trees as well? */
2188 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2189 {
2190         struct drbd_peer_request *rs_req;
2191         bool rv = false;
2192
2193         spin_lock_irq(&device->resource->req_lock);
2194         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2195                 if (overlaps(peer_req->i.sector, peer_req->i.size,
2196                              rs_req->i.sector, rs_req->i.size)) {
2197                         rv = true;
2198                         break;
2199                 }
2200         }
2201         spin_unlock_irq(&device->resource->req_lock);
2202
2203         return rv;
2204 }
2205
2206 /* Called from receive_Data.
2207  * Synchronize packets on sock with packets on msock.
2208  *
2209  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2210  * packet traveling on msock, they are still processed in the order they have
2211  * been sent.
2212  *
2213  * Note: we don't care for Ack packets overtaking P_DATA packets.
2214  *
2215  * In case packet_seq is larger than device->peer_seq number, there are
2216  * outstanding packets on the msock. We wait for them to arrive.
2217  * In case we are the logically next packet, we update device->peer_seq
2218  * ourselves. Correctly handles 32bit wrap around.
2219  *
2220  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2221  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2222  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2223  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2224  *
2225  * returns 0 if we may process the packet,
2226  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2227 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2228 {
2229         struct drbd_device *device = peer_device->device;
2230         DEFINE_WAIT(wait);
2231         long timeout;
2232         int ret = 0, tp;
2233
2234         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2235                 return 0;
2236
2237         spin_lock(&device->peer_seq_lock);
2238         for (;;) {
2239                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2240                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
2241                         break;
2242                 }
2243
2244                 if (signal_pending(current)) {
2245                         ret = -ERESTARTSYS;
2246                         break;
2247                 }
2248
2249                 rcu_read_lock();
2250                 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2251                 rcu_read_unlock();
2252
2253                 if (!tp)
2254                         break;
2255
2256                 /* Only need to wait if two_primaries is enabled */
2257                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2258                 spin_unlock(&device->peer_seq_lock);
2259                 rcu_read_lock();
2260                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2261                 rcu_read_unlock();
2262                 timeout = schedule_timeout(timeout);
2263                 spin_lock(&device->peer_seq_lock);
2264                 if (!timeout) {
2265                         ret = -ETIMEDOUT;
2266                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2267                         break;
2268                 }
2269         }
2270         spin_unlock(&device->peer_seq_lock);
2271         finish_wait(&device->seq_wait, &wait);
2272         return ret;
2273 }
2274
2275 /* see also bio_flags_to_wire()
2276  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2277  * flags and back. We may replicate to other kernel versions. */
2278 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2279 {
2280         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2281                 (dpf & DP_FUA ? REQ_FUA : 0) |
2282                 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2283 }
2284
2285 static unsigned long wire_flags_to_bio_op(u32 dpf)
2286 {
2287         if (dpf & DP_DISCARD)
2288                 return REQ_OP_WRITE_ZEROES;
2289         else
2290                 return REQ_OP_WRITE;
2291 }
2292
2293 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2294                                     unsigned int size)
2295 {
2296         struct drbd_interval *i;
2297
2298     repeat:
2299         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2300                 struct drbd_request *req;
2301                 struct bio_and_error m;
2302
2303                 if (!i->local)
2304                         continue;
2305                 req = container_of(i, struct drbd_request, i);
2306                 if (!(req->rq_state & RQ_POSTPONED))
2307                         continue;
2308                 req->rq_state &= ~RQ_POSTPONED;
2309                 __req_mod(req, NEG_ACKED, &m);
2310                 spin_unlock_irq(&device->resource->req_lock);
2311                 if (m.bio)
2312                         complete_master_bio(device, &m);
2313                 spin_lock_irq(&device->resource->req_lock);
2314                 goto repeat;
2315         }
2316 }
2317
2318 static int handle_write_conflicts(struct drbd_device *device,
2319                                   struct drbd_peer_request *peer_req)
2320 {
2321         struct drbd_connection *connection = peer_req->peer_device->connection;
2322         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2323         sector_t sector = peer_req->i.sector;
2324         const unsigned int size = peer_req->i.size;
2325         struct drbd_interval *i;
2326         bool equal;
2327         int err;
2328
2329         /*
2330          * Inserting the peer request into the write_requests tree will prevent
2331          * new conflicting local requests from being added.
2332          */
2333         drbd_insert_interval(&device->write_requests, &peer_req->i);
2334
2335     repeat:
2336         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2337                 if (i == &peer_req->i)
2338                         continue;
2339                 if (i->completed)
2340                         continue;
2341
2342                 if (!i->local) {
2343                         /*
2344                          * Our peer has sent a conflicting remote request; this
2345                          * should not happen in a two-node setup.  Wait for the
2346                          * earlier peer request to complete.
2347                          */
2348                         err = drbd_wait_misc(device, i);
2349                         if (err)
2350                                 goto out;
2351                         goto repeat;
2352                 }
2353
2354                 equal = i->sector == sector && i->size == size;
2355                 if (resolve_conflicts) {
2356                         /*
2357                          * If the peer request is fully contained within the
2358                          * overlapping request, it can be considered overwritten
2359                          * and thus superseded; otherwise, it will be retried
2360                          * once all overlapping requests have completed.
2361                          */
2362                         bool superseded = i->sector <= sector && i->sector +
2363                                        (i->size >> 9) >= sector + (size >> 9);
2364
2365                         if (!equal)
2366                                 drbd_alert(device, "Concurrent writes detected: "
2367                                                "local=%llus +%u, remote=%llus +%u, "
2368                                                "assuming %s came first\n",
2369                                           (unsigned long long)i->sector, i->size,
2370                                           (unsigned long long)sector, size,
2371                                           superseded ? "local" : "remote");
2372
2373                         peer_req->w.cb = superseded ? e_send_superseded :
2374                                                    e_send_retry_write;
2375                         list_add_tail(&peer_req->w.list, &device->done_ee);
2376                         queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2377
2378                         err = -ENOENT;
2379                         goto out;
2380                 } else {
2381                         struct drbd_request *req =
2382                                 container_of(i, struct drbd_request, i);
2383
2384                         if (!equal)
2385                                 drbd_alert(device, "Concurrent writes detected: "
2386                                                "local=%llus +%u, remote=%llus +%u\n",
2387                                           (unsigned long long)i->sector, i->size,
2388                                           (unsigned long long)sector, size);
2389
2390                         if (req->rq_state & RQ_LOCAL_PENDING ||
2391                             !(req->rq_state & RQ_POSTPONED)) {
2392                                 /*
2393                                  * Wait for the node with the discard flag to
2394                                  * decide if this request has been superseded
2395                                  * or needs to be retried.
2396                                  * Requests that have been superseded will
2397                                  * disappear from the write_requests tree.
2398                                  *
2399                                  * In addition, wait for the conflicting
2400                                  * request to finish locally before submitting
2401                                  * the conflicting peer request.
2402                                  */
2403                                 err = drbd_wait_misc(device, &req->i);
2404                                 if (err) {
2405                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2406                                         fail_postponed_requests(device, sector, size);
2407                                         goto out;
2408                                 }
2409                                 goto repeat;
2410                         }
2411                         /*
2412                          * Remember to restart the conflicting requests after
2413                          * the new peer request has completed.
2414                          */
2415                         peer_req->flags |= EE_RESTART_REQUESTS;
2416                 }
2417         }
2418         err = 0;
2419
2420     out:
2421         if (err)
2422                 drbd_remove_epoch_entry_interval(device, peer_req);
2423         return err;
2424 }
2425
2426 /* mirrored write */
2427 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2428 {
2429         struct drbd_peer_device *peer_device;
2430         struct drbd_device *device;
2431         struct net_conf *nc;
2432         sector_t sector;
2433         struct drbd_peer_request *peer_req;
2434         struct p_data *p = pi->data;
2435         u32 peer_seq = be32_to_cpu(p->seq_num);
2436         int op, op_flags;
2437         u32 dp_flags;
2438         int err, tp;
2439
2440         peer_device = conn_peer_device(connection, pi->vnr);
2441         if (!peer_device)
2442                 return -EIO;
2443         device = peer_device->device;
2444
2445         if (!get_ldev(device)) {
2446                 int err2;
2447
2448                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2449                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2450                 atomic_inc(&connection->current_epoch->epoch_size);
2451                 err2 = drbd_drain_block(peer_device, pi->size);
2452                 if (!err)
2453                         err = err2;
2454                 return err;
2455         }
2456
2457         /*
2458          * Corresponding put_ldev done either below (on various errors), or in
2459          * drbd_peer_request_endio, if we successfully submit the data at the
2460          * end of this function.
2461          */
2462
2463         sector = be64_to_cpu(p->sector);
2464         peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2465         if (!peer_req) {
2466                 put_ldev(device);
2467                 return -EIO;
2468         }
2469
2470         peer_req->w.cb = e_end_block;
2471         peer_req->submit_jif = jiffies;
2472         peer_req->flags |= EE_APPLICATION;
2473
2474         dp_flags = be32_to_cpu(p->dp_flags);
2475         op = wire_flags_to_bio_op(dp_flags);
2476         op_flags = wire_flags_to_bio_flags(dp_flags);
2477         if (pi->cmd == P_TRIM) {
2478                 D_ASSERT(peer_device, peer_req->i.size > 0);
2479                 D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2480                 D_ASSERT(peer_device, peer_req->pages == NULL);
2481         } else if (peer_req->pages == NULL) {
2482                 D_ASSERT(device, peer_req->i.size == 0);
2483                 D_ASSERT(device, dp_flags & DP_FLUSH);
2484         }
2485
2486         if (dp_flags & DP_MAY_SET_IN_SYNC)
2487                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2488
2489         spin_lock(&connection->epoch_lock);
2490         peer_req->epoch = connection->current_epoch;
2491         atomic_inc(&peer_req->epoch->epoch_size);
2492         atomic_inc(&peer_req->epoch->active);
2493         spin_unlock(&connection->epoch_lock);
2494
2495         rcu_read_lock();
2496         nc = rcu_dereference(peer_device->connection->net_conf);
2497         tp = nc->two_primaries;
2498         if (peer_device->connection->agreed_pro_version < 100) {
2499                 switch (nc->wire_protocol) {
2500                 case DRBD_PROT_C:
2501                         dp_flags |= DP_SEND_WRITE_ACK;
2502                         break;
2503                 case DRBD_PROT_B:
2504                         dp_flags |= DP_SEND_RECEIVE_ACK;
2505                         break;
2506                 }
2507         }
2508         rcu_read_unlock();
2509
2510         if (dp_flags & DP_SEND_WRITE_ACK) {
2511                 peer_req->flags |= EE_SEND_WRITE_ACK;
2512                 inc_unacked(device);
2513                 /* corresponding dec_unacked() in e_end_block()
2514                  * respective _drbd_clear_done_ee */
2515         }
2516
2517         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2518                 /* I really don't like it that the receiver thread
2519                  * sends on the msock, but anyways */
2520                 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2521         }
2522
2523         if (tp) {
2524                 /* two primaries implies protocol C */
2525                 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2526                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2527                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2528                 if (err)
2529                         goto out_interrupted;
2530                 spin_lock_irq(&device->resource->req_lock);
2531                 err = handle_write_conflicts(device, peer_req);
2532                 if (err) {
2533                         spin_unlock_irq(&device->resource->req_lock);
2534                         if (err == -ENOENT) {
2535                                 put_ldev(device);
2536                                 return 0;
2537                         }
2538                         goto out_interrupted;
2539                 }
2540         } else {
2541                 update_peer_seq(peer_device, peer_seq);
2542                 spin_lock_irq(&device->resource->req_lock);
2543         }
2544         /* TRIM and WRITE_SAME are processed synchronously,
2545          * we wait for all pending requests, respectively wait for
2546          * active_ee to become empty in drbd_submit_peer_request();
2547          * better not add ourselves here. */
2548         if ((peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) == 0)
2549                 list_add_tail(&peer_req->w.list, &device->active_ee);
2550         spin_unlock_irq(&device->resource->req_lock);
2551
2552         if (device->state.conn == C_SYNC_TARGET)
2553                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2554
2555         if (device->state.pdsk < D_INCONSISTENT) {
2556                 /* In case we have the only disk of the cluster, */
2557                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2558                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2559                 drbd_al_begin_io(device, &peer_req->i);
2560                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2561         }
2562
2563         err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2564                                        DRBD_FAULT_DT_WR);
2565         if (!err)
2566                 return 0;
2567
2568         /* don't care for the reason here */
2569         drbd_err(device, "submit failed, triggering re-connect\n");
2570         spin_lock_irq(&device->resource->req_lock);
2571         list_del(&peer_req->w.list);
2572         drbd_remove_epoch_entry_interval(device, peer_req);
2573         spin_unlock_irq(&device->resource->req_lock);
2574         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2575                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2576                 drbd_al_complete_io(device, &peer_req->i);
2577         }
2578
2579 out_interrupted:
2580         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2581         put_ldev(device);
2582         drbd_free_peer_req(device, peer_req);
2583         return err;
2584 }
2585
2586 /* We may throttle resync, if the lower device seems to be busy,
2587  * and current sync rate is above c_min_rate.
2588  *
2589  * To decide whether or not the lower device is busy, we use a scheme similar
2590  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2591  * (more than 64 sectors) of activity we cannot account for with our own resync
2592  * activity, it obviously is "busy".
2593  *
2594  * The current sync rate used here uses only the most recent two step marks,
2595  * to have a short time average so we can react faster.
2596  */
2597 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2598                 bool throttle_if_app_is_waiting)
2599 {
2600         struct lc_element *tmp;
2601         bool throttle = drbd_rs_c_min_rate_throttle(device);
2602
2603         if (!throttle || throttle_if_app_is_waiting)
2604                 return throttle;
2605
2606         spin_lock_irq(&device->al_lock);
2607         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2608         if (tmp) {
2609                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2610                 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2611                         throttle = false;
2612                 /* Do not slow down if app IO is already waiting for this extent,
2613                  * and our progress is necessary for application IO to complete. */
2614         }
2615         spin_unlock_irq(&device->al_lock);
2616
2617         return throttle;
2618 }
2619
2620 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2621 {
2622         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2623         unsigned long db, dt, dbdt;
2624         unsigned int c_min_rate;
2625         int curr_events;
2626
2627         rcu_read_lock();
2628         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2629         rcu_read_unlock();
2630
2631         /* feature disabled? */
2632         if (c_min_rate == 0)
2633                 return false;
2634
2635         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2636                       (int)part_stat_read(&disk->part0, sectors[1]) -
2637                         atomic_read(&device->rs_sect_ev);
2638
2639         if (atomic_read(&device->ap_actlog_cnt)
2640             || curr_events - device->rs_last_events > 64) {
2641                 unsigned long rs_left;
2642                 int i;
2643
2644                 device->rs_last_events = curr_events;
2645
2646                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2647                  * approx. */
2648                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2649
2650                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2651                         rs_left = device->ov_left;
2652                 else
2653                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2654
2655                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2656                 if (!dt)
2657                         dt++;
2658                 db = device->rs_mark_left[i] - rs_left;
2659                 dbdt = Bit2KB(db/dt);
2660
2661                 if (dbdt > c_min_rate)
2662                         return true;
2663         }
2664         return false;
2665 }
2666
2667 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2668 {
2669         struct drbd_peer_device *peer_device;
2670         struct drbd_device *device;
2671         sector_t sector;
2672         sector_t capacity;
2673         struct drbd_peer_request *peer_req;
2674         struct digest_info *di = NULL;
2675         int size, verb;
2676         unsigned int fault_type;
2677         struct p_block_req *p = pi->data;
2678
2679         peer_device = conn_peer_device(connection, pi->vnr);
2680         if (!peer_device)
2681                 return -EIO;
2682         device = peer_device->device;
2683         capacity = drbd_get_capacity(device->this_bdev);
2684
2685         sector = be64_to_cpu(p->sector);
2686         size   = be32_to_cpu(p->blksize);
2687
2688         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2689                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2690                                 (unsigned long long)sector, size);
2691                 return -EINVAL;
2692         }
2693         if (sector + (size>>9) > capacity) {
2694                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2695                                 (unsigned long long)sector, size);
2696                 return -EINVAL;
2697         }
2698
2699         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2700                 verb = 1;
2701                 switch (pi->cmd) {
2702                 case P_DATA_REQUEST:
2703                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2704                         break;
2705                 case P_RS_THIN_REQ:
2706                 case P_RS_DATA_REQUEST:
2707                 case P_CSUM_RS_REQUEST:
2708                 case P_OV_REQUEST:
2709                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2710                         break;
2711                 case P_OV_REPLY:
2712                         verb = 0;
2713                         dec_rs_pending(device);
2714                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2715                         break;
2716                 default:
2717                         BUG();
2718                 }
2719                 if (verb && __ratelimit(&drbd_ratelimit_state))
2720                         drbd_err(device, "Can not satisfy peer's read request, "
2721                             "no local data.\n");
2722
2723                 /* drain possibly payload */
2724                 return drbd_drain_block(peer_device, pi->size);
2725         }
2726
2727         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2728          * "criss-cross" setup, that might cause write-out on some other DRBD,
2729          * which in turn might block on the other node at this very place.  */
2730         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2731                         size, GFP_NOIO);
2732         if (!peer_req) {
2733                 put_ldev(device);
2734                 return -ENOMEM;
2735         }
2736
2737         switch (pi->cmd) {
2738         case P_DATA_REQUEST:
2739                 peer_req->w.cb = w_e_end_data_req;
2740                 fault_type = DRBD_FAULT_DT_RD;
2741                 /* application IO, don't drbd_rs_begin_io */
2742                 peer_req->flags |= EE_APPLICATION;
2743                 goto submit;
2744
2745         case P_RS_THIN_REQ:
2746                 /* If at some point in the future we have a smart way to
2747                    find out if this data block is completely deallocated,
2748                    then we would do something smarter here than reading
2749                    the block... */
2750                 peer_req->flags |= EE_RS_THIN_REQ;
2751         case P_RS_DATA_REQUEST:
2752                 peer_req->w.cb = w_e_end_rsdata_req;
2753                 fault_type = DRBD_FAULT_RS_RD;
2754                 /* used in the sector offset progress display */
2755                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2756                 break;
2757
2758         case P_OV_REPLY:
2759         case P_CSUM_RS_REQUEST:
2760                 fault_type = DRBD_FAULT_RS_RD;
2761                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2762                 if (!di)
2763                         goto out_free_e;
2764
2765                 di->digest_size = pi->size;
2766                 di->digest = (((char *)di)+sizeof(struct digest_info));
2767
2768                 peer_req->digest = di;
2769                 peer_req->flags |= EE_HAS_DIGEST;
2770
2771                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2772                         goto out_free_e;
2773
2774                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2775                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2776                         peer_req->w.cb = w_e_end_csum_rs_req;
2777                         /* used in the sector offset progress display */
2778                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2779                         /* remember to report stats in drbd_resync_finished */
2780                         device->use_csums = true;
2781                 } else if (pi->cmd == P_OV_REPLY) {
2782                         /* track progress, we may need to throttle */
2783                         atomic_add(size >> 9, &device->rs_sect_in);
2784                         peer_req->w.cb = w_e_end_ov_reply;
2785                         dec_rs_pending(device);
2786                         /* drbd_rs_begin_io done when we sent this request,
2787                          * but accounting still needs to be done. */
2788                         goto submit_for_resync;
2789                 }
2790                 break;
2791
2792         case P_OV_REQUEST:
2793                 if (device->ov_start_sector == ~(sector_t)0 &&
2794                     peer_device->connection->agreed_pro_version >= 90) {
2795                         unsigned long now = jiffies;
2796                         int i;
2797                         device->ov_start_sector = sector;
2798                         device->ov_position = sector;
2799                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2800                         device->rs_total = device->ov_left;
2801                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2802                                 device->rs_mark_left[i] = device->ov_left;
2803                                 device->rs_mark_time[i] = now;
2804                         }
2805                         drbd_info(device, "Online Verify start sector: %llu\n",
2806                                         (unsigned long long)sector);
2807                 }
2808                 peer_req->w.cb = w_e_end_ov_req;
2809                 fault_type = DRBD_FAULT_RS_RD;
2810                 break;
2811
2812         default:
2813                 BUG();
2814         }
2815
2816         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2817          * wrt the receiver, but it is not as straightforward as it may seem.
2818          * Various places in the resync start and stop logic assume resync
2819          * requests are processed in order, requeuing this on the worker thread
2820          * introduces a bunch of new code for synchronization between threads.
2821          *
2822          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2823          * "forever", throttling after drbd_rs_begin_io will lock that extent
2824          * for application writes for the same time.  For now, just throttle
2825          * here, where the rest of the code expects the receiver to sleep for
2826          * a while, anyways.
2827          */
2828
2829         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2830          * this defers syncer requests for some time, before letting at least
2831          * on request through.  The resync controller on the receiving side
2832          * will adapt to the incoming rate accordingly.
2833          *
2834          * We cannot throttle here if remote is Primary/SyncTarget:
2835          * we would also throttle its application reads.
2836          * In that case, throttling is done on the SyncTarget only.
2837          */
2838
2839         /* Even though this may be a resync request, we do add to "read_ee";
2840          * "sync_ee" is only used for resync WRITEs.
2841          * Add to list early, so debugfs can find this request
2842          * even if we have to sleep below. */
2843         spin_lock_irq(&device->resource->req_lock);
2844         list_add_tail(&peer_req->w.list, &device->read_ee);
2845         spin_unlock_irq(&device->resource->req_lock);
2846
2847         update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2848         if (device->state.peer != R_PRIMARY
2849         && drbd_rs_should_slow_down(device, sector, false))
2850                 schedule_timeout_uninterruptible(HZ/10);
2851         update_receiver_timing_details(connection, drbd_rs_begin_io);
2852         if (drbd_rs_begin_io(device, sector))
2853                 goto out_free_e;
2854
2855 submit_for_resync:
2856         atomic_add(size >> 9, &device->rs_sect_ev);
2857
2858 submit:
2859         update_receiver_timing_details(connection, drbd_submit_peer_request);
2860         inc_unacked(device);
2861         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2862                                      fault_type) == 0)
2863                 return 0;
2864
2865         /* don't care for the reason here */
2866         drbd_err(device, "submit failed, triggering re-connect\n");
2867
2868 out_free_e:
2869         spin_lock_irq(&device->resource->req_lock);
2870         list_del(&peer_req->w.list);
2871         spin_unlock_irq(&device->resource->req_lock);
2872         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2873
2874         put_ldev(device);
2875         drbd_free_peer_req(device, peer_req);
2876         return -EIO;
2877 }
2878
2879 /**
2880  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2881  */
2882 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2883 {
2884         struct drbd_device *device = peer_device->device;
2885         int self, peer, rv = -100;
2886         unsigned long ch_self, ch_peer;
2887         enum drbd_after_sb_p after_sb_0p;
2888
2889         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2890         peer = device->p_uuid[UI_BITMAP] & 1;
2891
2892         ch_peer = device->p_uuid[UI_SIZE];
2893         ch_self = device->comm_bm_set;
2894
2895         rcu_read_lock();
2896         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2897         rcu_read_unlock();
2898         switch (after_sb_0p) {
2899         case ASB_CONSENSUS:
2900         case ASB_DISCARD_SECONDARY:
2901         case ASB_CALL_HELPER:
2902         case ASB_VIOLENTLY:
2903                 drbd_err(device, "Configuration error.\n");
2904                 break;
2905         case ASB_DISCONNECT:
2906                 break;
2907         case ASB_DISCARD_YOUNGER_PRI:
2908                 if (self == 0 && peer == 1) {
2909                         rv = -1;
2910                         break;
2911                 }
2912                 if (self == 1 && peer == 0) {
2913                         rv =  1;
2914                         break;
2915                 }
2916                 /* Else fall through to one of the other strategies... */
2917         case ASB_DISCARD_OLDER_PRI:
2918                 if (self == 0 && peer == 1) {
2919                         rv = 1;
2920                         break;
2921                 }
2922                 if (self == 1 && peer == 0) {
2923                         rv = -1;
2924                         break;
2925                 }
2926                 /* Else fall through to one of the other strategies... */
2927                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2928                      "Using discard-least-changes instead\n");
2929         case ASB_DISCARD_ZERO_CHG:
2930                 if (ch_peer == 0 && ch_self == 0) {
2931                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2932                                 ? -1 : 1;
2933                         break;
2934                 } else {
2935                         if (ch_peer == 0) { rv =  1; break; }
2936                         if (ch_self == 0) { rv = -1; break; }
2937                 }
2938                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2939                         break;
2940         case ASB_DISCARD_LEAST_CHG:
2941                 if      (ch_self < ch_peer)
2942                         rv = -1;
2943                 else if (ch_self > ch_peer)
2944                         rv =  1;
2945                 else /* ( ch_self == ch_peer ) */
2946                      /* Well, then use something else. */
2947                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2948                                 ? -1 : 1;
2949                 break;
2950         case ASB_DISCARD_LOCAL:
2951                 rv = -1;
2952                 break;
2953         case ASB_DISCARD_REMOTE:
2954                 rv =  1;
2955         }
2956
2957         return rv;
2958 }
2959
2960 /**
2961  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2962  */
2963 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2964 {
2965         struct drbd_device *device = peer_device->device;
2966         int hg, rv = -100;
2967         enum drbd_after_sb_p after_sb_1p;
2968
2969         rcu_read_lock();
2970         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2971         rcu_read_unlock();
2972         switch (after_sb_1p) {
2973         case ASB_DISCARD_YOUNGER_PRI:
2974         case ASB_DISCARD_OLDER_PRI:
2975         case ASB_DISCARD_LEAST_CHG:
2976         case ASB_DISCARD_LOCAL:
2977         case ASB_DISCARD_REMOTE:
2978         case ASB_DISCARD_ZERO_CHG:
2979                 drbd_err(device, "Configuration error.\n");
2980                 break;
2981         case ASB_DISCONNECT:
2982                 break;
2983         case ASB_CONSENSUS:
2984                 hg = drbd_asb_recover_0p(peer_device);
2985                 if (hg == -1 && device->state.role == R_SECONDARY)
2986                         rv = hg;
2987                 if (hg == 1  && device->state.role == R_PRIMARY)
2988                         rv = hg;
2989                 break;
2990         case ASB_VIOLENTLY:
2991                 rv = drbd_asb_recover_0p(peer_device);
2992                 break;
2993         case ASB_DISCARD_SECONDARY:
2994                 return device->state.role == R_PRIMARY ? 1 : -1;
2995         case ASB_CALL_HELPER:
2996                 hg = drbd_asb_recover_0p(peer_device);
2997                 if (hg == -1 && device->state.role == R_PRIMARY) {
2998                         enum drbd_state_rv rv2;
2999
3000                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3001                           * we might be here in C_WF_REPORT_PARAMS which is transient.
3002                           * we do not need to wait for the after state change work either. */
3003                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3004                         if (rv2 != SS_SUCCESS) {
3005                                 drbd_khelper(device, "pri-lost-after-sb");
3006                         } else {
3007                                 drbd_warn(device, "Successfully gave up primary role.\n");
3008                                 rv = hg;
3009                         }
3010                 } else
3011                         rv = hg;
3012         }
3013
3014         return rv;
3015 }
3016
3017 /**
3018  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3019  */
3020 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3021 {
3022         struct drbd_device *device = peer_device->device;
3023         int hg, rv = -100;
3024         enum drbd_after_sb_p after_sb_2p;
3025
3026         rcu_read_lock();
3027         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3028         rcu_read_unlock();
3029         switch (after_sb_2p) {
3030         case ASB_DISCARD_YOUNGER_PRI:
3031         case ASB_DISCARD_OLDER_PRI:
3032         case ASB_DISCARD_LEAST_CHG:
3033         case ASB_DISCARD_LOCAL:
3034         case ASB_DISCARD_REMOTE:
3035         case ASB_CONSENSUS:
3036         case ASB_DISCARD_SECONDARY:
3037         case ASB_DISCARD_ZERO_CHG:
3038                 drbd_err(device, "Configuration error.\n");
3039                 break;
3040         case ASB_VIOLENTLY:
3041                 rv = drbd_asb_recover_0p(peer_device);
3042                 break;
3043         case ASB_DISCONNECT:
3044                 break;
3045         case ASB_CALL_HELPER:
3046                 hg = drbd_asb_recover_0p(peer_device);
3047                 if (hg == -1) {
3048                         enum drbd_state_rv rv2;
3049
3050                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3051                           * we might be here in C_WF_REPORT_PARAMS which is transient.
3052                           * we do not need to wait for the after state change work either. */
3053                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3054                         if (rv2 != SS_SUCCESS) {
3055                                 drbd_khelper(device, "pri-lost-after-sb");
3056                         } else {
3057                                 drbd_warn(device, "Successfully gave up primary role.\n");
3058                                 rv = hg;
3059                         }
3060                 } else
3061                         rv = hg;
3062         }
3063
3064         return rv;
3065 }
3066
3067 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3068                            u64 bits, u64 flags)
3069 {
3070         if (!uuid) {
3071                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3072                 return;
3073         }
3074         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3075              text,
3076              (unsigned long long)uuid[UI_CURRENT],
3077              (unsigned long long)uuid[UI_BITMAP],
3078              (unsigned long long)uuid[UI_HISTORY_START],
3079              (unsigned long long)uuid[UI_HISTORY_END],
3080              (unsigned long long)bits,
3081              (unsigned long long)flags);
3082 }
3083
3084 /*
3085   100   after split brain try auto recover
3086     2   C_SYNC_SOURCE set BitMap
3087     1   C_SYNC_SOURCE use BitMap
3088     0   no Sync
3089    -1   C_SYNC_TARGET use BitMap
3090    -2   C_SYNC_TARGET set BitMap
3091  -100   after split brain, disconnect
3092 -1000   unrelated data
3093 -1091   requires proto 91
3094 -1096   requires proto 96
3095  */
3096
3097 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3098 {
3099         struct drbd_peer_device *const peer_device = first_peer_device(device);
3100         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3101         u64 self, peer;
3102         int i, j;
3103
3104         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3105         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3106
3107         *rule_nr = 10;
3108         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3109                 return 0;
3110
3111         *rule_nr = 20;
3112         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3113              peer != UUID_JUST_CREATED)
3114                 return -2;
3115
3116         *rule_nr = 30;
3117         if (self != UUID_JUST_CREATED &&
3118             (peer == UUID_JUST_CREATED || peer == (u64)0))
3119                 return 2;
3120
3121         if (self == peer) {
3122                 int rct, dc; /* roles at crash time */
3123
3124                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3125
3126                         if (connection->agreed_pro_version < 91)
3127                                 return -1091;
3128
3129                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3130                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3131                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3132                                 drbd_uuid_move_history(device);
3133                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3134                                 device->ldev->md.uuid[UI_BITMAP] = 0;
3135
3136                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3137                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3138                                 *rule_nr = 34;
3139                         } else {
3140                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3141                                 *rule_nr = 36;
3142                         }
3143
3144                         return 1;
3145                 }
3146
3147                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3148
3149                         if (connection->agreed_pro_version < 91)
3150                                 return -1091;
3151
3152                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3153                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3154                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3155
3156                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3157                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3158                                 device->p_uuid[UI_BITMAP] = 0UL;
3159
3160                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3161                                 *rule_nr = 35;
3162                         } else {
3163                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3164                                 *rule_nr = 37;
3165                         }
3166
3167                         return -1;
3168                 }
3169
3170                 /* Common power [off|failure] */
3171                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3172                         (device->p_uuid[UI_FLAGS] & 2);
3173                 /* lowest bit is set when we were primary,
3174                  * next bit (weight 2) is set when peer was primary */
3175                 *rule_nr = 40;
3176
3177                 /* Neither has the "crashed primary" flag set,
3178                  * only a replication link hickup. */
3179                 if (rct == 0)
3180                         return 0;
3181
3182                 /* Current UUID equal and no bitmap uuid; does not necessarily
3183                  * mean this was a "simultaneous hard crash", maybe IO was
3184                  * frozen, so no UUID-bump happened.
3185                  * This is a protocol change, overload DRBD_FF_WSAME as flag
3186                  * for "new-enough" peer DRBD version. */
3187                 if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3188                         *rule_nr = 41;
3189                         if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3190                                 drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3191                                 return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3192                         }
3193                         if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3194                                 /* At least one has the "crashed primary" bit set,
3195                                  * both are primary now, but neither has rotated its UUIDs?
3196                                  * "Can not happen." */
3197                                 drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3198                                 return -100;
3199                         }
3200                         if (device->state.role == R_PRIMARY)
3201                                 return 1;
3202                         return -1;
3203                 }
3204
3205                 /* Both are secondary.
3206                  * Really looks like recovery from simultaneous hard crash.
3207                  * Check which had been primary before, and arbitrate. */
3208                 switch (rct) {
3209                 case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3210                 case 1: /*  self_pri && !peer_pri */ return 1;
3211                 case 2: /* !self_pri &&  peer_pri */ return -1;
3212                 case 3: /*  self_pri &&  peer_pri */
3213                         dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3214                         return dc ? -1 : 1;
3215                 }
3216         }
3217
3218         *rule_nr = 50;
3219         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3220         if (self == peer)
3221                 return -1;
3222
3223         *rule_nr = 51;
3224         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3225         if (self == peer) {
3226                 if (connection->agreed_pro_version < 96 ?
3227                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3228                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3229                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3230                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3231                            resync as sync source modifications of the peer's UUIDs. */
3232
3233                         if (connection->agreed_pro_version < 91)
3234                                 return -1091;
3235
3236                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3237                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3238
3239                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3240                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3241
3242                         return -1;
3243                 }
3244         }
3245
3246         *rule_nr = 60;
3247         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3248         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3249                 peer = device->p_uuid[i] & ~((u64)1);
3250                 if (self == peer)
3251                         return -2;
3252         }
3253
3254         *rule_nr = 70;
3255         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3256         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3257         if (self == peer)
3258                 return 1;
3259
3260         *rule_nr = 71;
3261         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3262         if (self == peer) {
3263                 if (connection->agreed_pro_version < 96 ?
3264                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3265                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3266                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3267                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3268                            resync as sync source modifications of our UUIDs. */
3269
3270                         if (connection->agreed_pro_version < 91)
3271                                 return -1091;
3272
3273                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3274                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3275
3276                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3277                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3278                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3279
3280                         return 1;
3281                 }
3282         }
3283
3284
3285         *rule_nr = 80;
3286         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3287         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3288                 self = device->ldev->md.uuid[i] & ~((u64)1);
3289                 if (self == peer)
3290                         return 2;
3291         }
3292
3293         *rule_nr = 90;
3294         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3295         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3296         if (self == peer && self != ((u64)0))
3297                 return 100;
3298
3299         *rule_nr = 100;
3300         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3301                 self = device->ldev->md.uuid[i] & ~((u64)1);
3302                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3303                         peer = device->p_uuid[j] & ~((u64)1);
3304                         if (self == peer)
3305                                 return -100;
3306                 }
3307         }
3308
3309         return -1000;
3310 }
3311
3312 /* drbd_sync_handshake() returns the new conn state on success, or
3313    CONN_MASK (-1) on failure.
3314  */
3315 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3316                                            enum drbd_role peer_role,
3317                                            enum drbd_disk_state peer_disk) __must_hold(local)
3318 {
3319         struct drbd_device *device = peer_device->device;
3320         enum drbd_conns rv = C_MASK;
3321         enum drbd_disk_state mydisk;
3322         struct net_conf *nc;
3323         int hg, rule_nr, rr_conflict, tentative;
3324
3325         mydisk = device->state.disk;
3326         if (mydisk == D_NEGOTIATING)
3327                 mydisk = device->new_state_tmp.disk;
3328
3329         drbd_info(device, "drbd_sync_handshake:\n");
3330
3331         spin_lock_irq(&device->ldev->md.uuid_lock);
3332         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3333         drbd_uuid_dump(device, "peer", device->p_uuid,
3334                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3335
3336         hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3337         spin_unlock_irq(&device->ldev->md.uuid_lock);
3338
3339         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3340
3341         if (hg == -1000) {
3342                 drbd_alert(device, "Unrelated data, aborting!\n");
3343                 return C_MASK;
3344         }
3345         if (hg < -0x10000) {
3346                 int proto, fflags;
3347                 hg = -hg;
3348                 proto = hg & 0xff;
3349                 fflags = (hg >> 8) & 0xff;
3350                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3351                                         proto, fflags);
3352                 return C_MASK;
3353         }
3354         if (hg < -1000) {
3355                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3356                 return C_MASK;
3357         }
3358
3359         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3360             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3361                 int f = (hg == -100) || abs(hg) == 2;
3362                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3363                 if (f)
3364                         hg = hg*2;
3365                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3366                      hg > 0 ? "source" : "target");
3367         }
3368
3369         if (abs(hg) == 100)
3370                 drbd_khelper(device, "initial-split-brain");
3371
3372         rcu_read_lock();
3373         nc = rcu_dereference(peer_device->connection->net_conf);
3374
3375         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3376                 int pcount = (device->state.role == R_PRIMARY)
3377                            + (peer_role == R_PRIMARY);
3378                 int forced = (hg == -100);
3379
3380                 switch (pcount) {
3381                 case 0:
3382                         hg = drbd_asb_recover_0p(peer_device);
3383                         break;
3384                 case 1:
3385                         hg = drbd_asb_recover_1p(peer_device);
3386                         break;
3387                 case 2:
3388                         hg = drbd_asb_recover_2p(peer_device);
3389                         break;
3390                 }
3391                 if (abs(hg) < 100) {
3392                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3393                              "automatically solved. Sync from %s node\n",
3394                              pcount, (hg < 0) ? "peer" : "this");
3395                         if (forced) {
3396                                 drbd_warn(device, "Doing a full sync, since"
3397                                      " UUIDs where ambiguous.\n");
3398                                 hg = hg*2;
3399                         }
3400                 }
3401         }
3402
3403         if (hg == -100) {
3404                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3405                         hg = -1;
3406                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3407                         hg = 1;
3408
3409                 if (abs(hg) < 100)
3410                         drbd_warn(device, "Split-Brain detected, manually solved. "
3411                              "Sync from %s node\n",
3412                              (hg < 0) ? "peer" : "this");
3413         }
3414         rr_conflict = nc->rr_conflict;
3415         tentative = nc->tentative;
3416         rcu_read_unlock();
3417
3418         if (hg == -100) {
3419                 /* FIXME this log message is not correct if we end up here
3420                  * after an attempted attach on a diskless node.
3421                  * We just refuse to attach -- well, we drop the "connection"
3422                  * to that disk, in a way... */
3423                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3424                 drbd_khelper(device, "split-brain");
3425                 return C_MASK;
3426         }
3427
3428         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3429                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3430                 return C_MASK;
3431         }
3432
3433         if (hg < 0 && /* by intention we do not use mydisk here. */
3434             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3435                 switch (rr_conflict) {
3436                 case ASB_CALL_HELPER:
3437                         drbd_khelper(device, "pri-lost");
3438                         /* fall through */
3439                 case ASB_DISCONNECT:
3440                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3441                         return C_MASK;
3442                 case ASB_VIOLENTLY:
3443                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3444                              "assumption\n");
3445                 }
3446         }
3447
3448         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3449                 if (hg == 0)
3450                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3451                 else
3452                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3453                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3454                                  abs(hg) >= 2 ? "full" : "bit-map based");
3455                 return C_MASK;
3456         }
3457
3458         if (abs(hg) >= 2) {
3459                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3460                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3461                                         BM_LOCKED_SET_ALLOWED))
3462                         return C_MASK;
3463         }
3464
3465         if (hg > 0) { /* become sync source. */
3466                 rv = C_WF_BITMAP_S;
3467         } else if (hg < 0) { /* become sync target */
3468                 rv = C_WF_BITMAP_T;
3469         } else {
3470                 rv = C_CONNECTED;
3471                 if (drbd_bm_total_weight(device)) {
3472                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3473                              drbd_bm_total_weight(device));
3474                 }
3475         }
3476
3477         return rv;
3478 }
3479
3480 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3481 {
3482         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3483         if (peer == ASB_DISCARD_REMOTE)
3484                 return ASB_DISCARD_LOCAL;
3485
3486         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3487         if (peer == ASB_DISCARD_LOCAL)
3488                 return ASB_DISCARD_REMOTE;
3489
3490         /* everything else is valid if they are equal on both sides. */
3491         return peer;
3492 }
3493
3494 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3495 {
3496         struct p_protocol *p = pi->data;
3497         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3498         int p_proto, p_discard_my_data, p_two_primaries, cf;
3499         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3500         char integrity_alg[SHARED_SECRET_MAX] = "";
3501         struct crypto_ahash *peer_integrity_tfm = NULL;
3502         void *int_dig_in = NULL, *int_dig_vv = NULL;
3503
3504         p_proto         = be32_to_cpu(p->protocol);
3505         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3506         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3507         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3508         p_two_primaries = be32_to_cpu(p->two_primaries);
3509         cf              = be32_to_cpu(p->conn_flags);
3510         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3511
3512         if (connection->agreed_pro_version >= 87) {
3513                 int err;
3514
3515                 if (pi->size > sizeof(integrity_alg))
3516                         return -EIO;
3517                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3518                 if (err)
3519                         return err;
3520                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3521         }
3522
3523         if (pi->cmd != P_PROTOCOL_UPDATE) {
3524                 clear_bit(CONN_DRY_RUN, &connection->flags);
3525
3526                 if (cf & CF_DRY_RUN)
3527                         set_bit(CONN_DRY_RUN, &connection->flags);
3528
3529                 rcu_read_lock();
3530                 nc = rcu_dereference(connection->net_conf);
3531
3532                 if (p_proto != nc->wire_protocol) {
3533                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3534                         goto disconnect_rcu_unlock;
3535                 }
3536
3537                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3538                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3539                         goto disconnect_rcu_unlock;
3540                 }
3541
3542                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3543                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3544                         goto disconnect_rcu_unlock;
3545                 }
3546
3547                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3548                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3549                         goto disconnect_rcu_unlock;
3550                 }
3551
3552                 if (p_discard_my_data && nc->discard_my_data) {
3553                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3554                         goto disconnect_rcu_unlock;
3555                 }
3556
3557                 if (p_two_primaries != nc->two_primaries) {
3558                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3559                         goto disconnect_rcu_unlock;
3560                 }
3561
3562                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3563                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3564                         goto disconnect_rcu_unlock;
3565                 }
3566
3567                 rcu_read_unlock();
3568         }
3569
3570         if (integrity_alg[0]) {
3571                 int hash_size;
3572
3573                 /*
3574                  * We can only change the peer data integrity algorithm
3575                  * here.  Changing our own data integrity algorithm
3576                  * requires that we send a P_PROTOCOL_UPDATE packet at
3577                  * the same time; otherwise, the peer has no way to
3578                  * tell between which packets the algorithm should
3579                  * change.
3580                  */
3581
3582                 peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3583                 if (IS_ERR(peer_integrity_tfm)) {
3584                         peer_integrity_tfm = NULL;
3585                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3586                                  integrity_alg);
3587                         goto disconnect;
3588                 }
3589
3590                 hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3591                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3592                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3593                 if (!(int_dig_in && int_dig_vv)) {
3594                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3595                         goto disconnect;
3596                 }
3597         }
3598
3599         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3600         if (!new_net_conf) {
3601                 drbd_err(connection, "Allocation of new net_conf failed\n");
3602                 goto disconnect;
3603         }
3604
3605         mutex_lock(&connection->data.mutex);
3606         mutex_lock(&connection->resource->conf_update);
3607         old_net_conf = connection->net_conf;
3608         *new_net_conf = *old_net_conf;
3609
3610         new_net_conf->wire_protocol = p_proto;
3611         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3612         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3613         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3614         new_net_conf->two_primaries = p_two_primaries;
3615
3616         rcu_assign_pointer(connection->net_conf, new_net_conf);
3617         mutex_unlock(&connection->resource->conf_update);
3618         mutex_unlock(&connection->data.mutex);
3619
3620         crypto_free_ahash(connection->peer_integrity_tfm);
3621         kfree(connection->int_dig_in);
3622         kfree(connection->int_dig_vv);
3623         connection->peer_integrity_tfm = peer_integrity_tfm;
3624         connection->int_dig_in = int_dig_in;
3625         connection->int_dig_vv = int_dig_vv;
3626
3627         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3628                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3629                           integrity_alg[0] ? integrity_alg : "(none)");
3630
3631         synchronize_rcu();
3632         kfree(old_net_conf);
3633         return 0;
3634
3635 disconnect_rcu_unlock:
3636         rcu_read_unlock();
3637 disconnect:
3638         crypto_free_ahash(peer_integrity_tfm);
3639         kfree(int_dig_in);
3640         kfree(int_dig_vv);
3641         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3642         return -EIO;
3643 }
3644
3645 /* helper function
3646  * input: alg name, feature name
3647  * return: NULL (alg name was "")
3648  *         ERR_PTR(error) if something goes wrong
3649  *         or the crypto hash ptr, if it worked out ok. */
3650 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3651                 const char *alg, const char *name)
3652 {
3653         struct crypto_ahash *tfm;
3654
3655         if (!alg[0])
3656                 return NULL;
3657
3658         tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3659         if (IS_ERR(tfm)) {
3660                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3661                         alg, name, PTR_ERR(tfm));
3662                 return tfm;
3663         }
3664         return tfm;
3665 }
3666
3667 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3668 {
3669         void *buffer = connection->data.rbuf;
3670         int size = pi->size;
3671
3672         while (size) {
3673                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3674                 s = drbd_recv(connection, buffer, s);
3675                 if (s <= 0) {
3676                         if (s < 0)
3677                                 return s;
3678                         break;
3679                 }
3680                 size -= s;
3681         }
3682         if (size)
3683                 return -EIO;
3684         return 0;
3685 }
3686
3687 /*
3688  * config_unknown_volume  -  device configuration command for unknown volume
3689  *
3690  * When a device is added to an existing connection, the node on which the
3691  * device is added first will send configuration commands to its peer but the
3692  * peer will not know about the device yet.  It will warn and ignore these
3693  * commands.  Once the device is added on the second node, the second node will
3694  * send the same device configuration commands, but in the other direction.
3695  *
3696  * (We can also end up here if drbd is misconfigured.)
3697  */
3698 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3699 {
3700         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3701                   cmdname(pi->cmd), pi->vnr);
3702         return ignore_remaining_packet(connection, pi);
3703 }
3704
3705 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3706 {
3707         struct drbd_peer_device *peer_device;
3708         struct drbd_device *device;
3709         struct p_rs_param_95 *p;
3710         unsigned int header_size, data_size, exp_max_sz;
3711         struct crypto_ahash *verify_tfm = NULL;
3712         struct crypto_ahash *csums_tfm = NULL;
3713         struct net_conf *old_net_conf, *new_net_conf = NULL;
3714         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3715         const int apv = connection->agreed_pro_version;
3716         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3717         int fifo_size = 0;
3718         int err;
3719
3720         peer_device = conn_peer_device(connection, pi->vnr);
3721         if (!peer_device)
3722                 return config_unknown_volume(connection, pi);
3723         device = peer_device->device;
3724
3725         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3726                     : apv == 88 ? sizeof(struct p_rs_param)
3727                                         + SHARED_SECRET_MAX
3728                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3729                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3730
3731         if (pi->size > exp_max_sz) {
3732                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3733                     pi->size, exp_max_sz);
3734                 return -EIO;
3735         }
3736
3737         if (apv <= 88) {
3738                 header_size = sizeof(struct p_rs_param);
3739                 data_size = pi->size - header_size;
3740         } else if (apv <= 94) {
3741                 header_size = sizeof(struct p_rs_param_89);
3742                 data_size = pi->size - header_size;
3743                 D_ASSERT(device, data_size == 0);
3744         } else {
3745                 header_size = sizeof(struct p_rs_param_95);
3746                 data_size = pi->size - header_size;
3747                 D_ASSERT(device, data_size == 0);
3748         }
3749
3750         /* initialize verify_alg and csums_alg */
3751         p = pi->data;
3752         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3753
3754         err = drbd_recv_all(peer_device->connection, p, header_size);
3755         if (err)
3756                 return err;
3757
3758         mutex_lock(&connection->resource->conf_update);
3759         old_net_conf = peer_device->connection->net_conf;
3760         if (get_ldev(device)) {
3761                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3762                 if (!new_disk_conf) {
3763                         put_ldev(device);
3764                         mutex_unlock(&connection->resource->conf_update);
3765                         drbd_err(device, "Allocation of new disk_conf failed\n");
3766                         return -ENOMEM;
3767                 }
3768
3769                 old_disk_conf = device->ldev->disk_conf;
3770                 *new_disk_conf = *old_disk_conf;
3771
3772                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3773         }
3774
3775         if (apv >= 88) {
3776                 if (apv == 88) {
3777                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3778                                 drbd_err(device, "verify-alg of wrong size, "
3779                                         "peer wants %u, accepting only up to %u byte\n",
3780                                         data_size, SHARED_SECRET_MAX);
3781                                 err = -EIO;
3782                                 goto reconnect;
3783                         }
3784
3785                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3786                         if (err)
3787                                 goto reconnect;
3788                         /* we expect NUL terminated string */
3789                         /* but just in case someone tries to be evil */
3790                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3791                         p->verify_alg[data_size-1] = 0;
3792
3793                 } else /* apv >= 89 */ {
3794                         /* we still expect NUL terminated strings */
3795                         /* but just in case someone tries to be evil */
3796                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3797                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3798                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3799                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3800                 }
3801
3802                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3803                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3804                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3805                                     old_net_conf->verify_alg, p->verify_alg);
3806                                 goto disconnect;
3807                         }
3808                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3809                                         p->verify_alg, "verify-alg");
3810                         if (IS_ERR(verify_tfm)) {
3811                                 verify_tfm = NULL;
3812                                 goto disconnect;
3813                         }
3814                 }
3815
3816                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3817                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3818                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3819                                     old_net_conf->csums_alg, p->csums_alg);
3820                                 goto disconnect;
3821                         }
3822                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3823                                         p->csums_alg, "csums-alg");
3824                         if (IS_ERR(csums_tfm)) {
3825                                 csums_tfm = NULL;
3826                                 goto disconnect;
3827                         }
3828                 }
3829
3830                 if (apv > 94 && new_disk_conf) {
3831                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3832                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3833                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3834                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3835
3836                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3837                         if (fifo_size != device->rs_plan_s->size) {
3838                                 new_plan = fifo_alloc(fifo_size);
3839                                 if (!new_plan) {
3840                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3841                                         put_ldev(device);
3842                                         goto disconnect;
3843                                 }
3844                         }
3845                 }
3846
3847                 if (verify_tfm || csums_tfm) {
3848                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3849                         if (!new_net_conf) {
3850                                 drbd_err(device, "Allocation of new net_conf failed\n");
3851                                 goto disconnect;
3852                         }
3853
3854                         *new_net_conf = *old_net_conf;
3855
3856                         if (verify_tfm) {
3857                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3858                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3859                                 crypto_free_ahash(peer_device->connection->verify_tfm);
3860                                 peer_device->connection->verify_tfm = verify_tfm;
3861                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3862                         }
3863                         if (csums_tfm) {
3864                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3865                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3866                                 crypto_free_ahash(peer_device->connection->csums_tfm);
3867                                 peer_device->connection->csums_tfm = csums_tfm;
3868                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3869                         }
3870                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3871                 }
3872         }
3873
3874         if (new_disk_conf) {
3875                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3876                 put_ldev(device);
3877         }
3878
3879         if (new_plan) {
3880                 old_plan = device->rs_plan_s;
3881                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3882         }
3883
3884         mutex_unlock(&connection->resource->conf_update);
3885         synchronize_rcu();
3886         if (new_net_conf)
3887                 kfree(old_net_conf);
3888         kfree(old_disk_conf);
3889         kfree(old_plan);
3890
3891         return 0;
3892
3893 reconnect:
3894         if (new_disk_conf) {
3895                 put_ldev(device);
3896                 kfree(new_disk_conf);
3897         }
3898         mutex_unlock(&connection->resource->conf_update);
3899         return -EIO;
3900
3901 disconnect:
3902         kfree(new_plan);
3903         if (new_disk_conf) {
3904                 put_ldev(device);
3905                 kfree(new_disk_conf);
3906         }
3907         mutex_unlock(&connection->resource->conf_update);
3908         /* just for completeness: actually not needed,
3909          * as this is not reached if csums_tfm was ok. */
3910         crypto_free_ahash(csums_tfm);
3911         /* but free the verify_tfm again, if csums_tfm did not work out */
3912         crypto_free_ahash(verify_tfm);
3913         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3914         return -EIO;
3915 }
3916
3917 /* warn if the arguments differ by more than 12.5% */
3918 static void warn_if_differ_considerably(struct drbd_device *device,
3919         const char *s, sector_t a, sector_t b)
3920 {
3921         sector_t d;
3922         if (a == 0 || b == 0)
3923                 return;
3924         d = (a > b) ? (a - b) : (b - a);
3925         if (d > (a>>3) || d > (b>>3))
3926                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3927                      (unsigned long long)a, (unsigned long long)b);
3928 }
3929
3930 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3931 {
3932         struct drbd_peer_device *peer_device;
3933         struct drbd_device *device;
3934         struct p_sizes *p = pi->data;
3935         struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
3936         enum determine_dev_size dd = DS_UNCHANGED;
3937         sector_t p_size, p_usize, p_csize, my_usize;
3938         int ldsc = 0; /* local disk size changed */
3939         enum dds_flags ddsf;
3940
3941         peer_device = conn_peer_device(connection, pi->vnr);
3942         if (!peer_device)
3943                 return config_unknown_volume(connection, pi);
3944         device = peer_device->device;
3945
3946         p_size = be64_to_cpu(p->d_size);
3947         p_usize = be64_to_cpu(p->u_size);
3948         p_csize = be64_to_cpu(p->c_size);
3949
3950         /* just store the peer's disk size for now.
3951          * we still need to figure out whether we accept that. */
3952         device->p_size = p_size;
3953
3954         if (get_ldev(device)) {
3955                 sector_t new_size, cur_size;
3956                 rcu_read_lock();
3957                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3958                 rcu_read_unlock();
3959
3960                 warn_if_differ_considerably(device, "lower level device sizes",
3961                            p_size, drbd_get_max_capacity(device->ldev));
3962                 warn_if_differ_considerably(device, "user requested size",
3963                                             p_usize, my_usize);
3964
3965                 /* if this is the first connect, or an otherwise expected
3966                  * param exchange, choose the minimum */
3967                 if (device->state.conn == C_WF_REPORT_PARAMS)
3968                         p_usize = min_not_zero(my_usize, p_usize);
3969
3970                 /* Never shrink a device with usable data during connect.
3971                    But allow online shrinking if we are connected. */
3972                 new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
3973                 cur_size = drbd_get_capacity(device->this_bdev);
3974                 if (new_size < cur_size &&
3975                     device->state.disk >= D_OUTDATED &&
3976                     device->state.conn < C_CONNECTED) {
3977                         drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
3978                                         (unsigned long long)new_size, (unsigned long long)cur_size);
3979                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3980                         put_ldev(device);
3981                         return -EIO;
3982                 }
3983
3984                 if (my_usize != p_usize) {
3985                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3986
3987                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3988                         if (!new_disk_conf) {
3989                                 drbd_err(device, "Allocation of new disk_conf failed\n");
3990                                 put_ldev(device);
3991                                 return -ENOMEM;
3992                         }
3993
3994                         mutex_lock(&connection->resource->conf_update);
3995                         old_disk_conf = device->ldev->disk_conf;
3996                         *new_disk_conf = *old_disk_conf;
3997                         new_disk_conf->disk_size = p_usize;
3998
3999                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4000                         mutex_unlock(&connection->resource->conf_update);
4001                         synchronize_rcu();
4002                         kfree(old_disk_conf);
4003
4004                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
4005                                  (unsigned long)my_usize);
4006                 }
4007
4008                 put_ldev(device);
4009         }
4010
4011         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4012         /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4013            In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4014            drbd_reconsider_queue_parameters(), we can be sure that after
4015            drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4016
4017         ddsf = be16_to_cpu(p->dds_flags);
4018         if (get_ldev(device)) {
4019                 drbd_reconsider_queue_parameters(device, device->ldev, o);
4020                 dd = drbd_determine_dev_size(device, ddsf, NULL);
4021                 put_ldev(device);
4022                 if (dd == DS_ERROR)
4023                         return -EIO;
4024                 drbd_md_sync(device);
4025         } else {
4026                 /*
4027                  * I am diskless, need to accept the peer's *current* size.
4028                  * I must NOT accept the peers backing disk size,
4029                  * it may have been larger than mine all along...
4030                  *
4031                  * At this point, the peer knows more about my disk, or at
4032                  * least about what we last agreed upon, than myself.
4033                  * So if his c_size is less than his d_size, the most likely
4034                  * reason is that *my* d_size was smaller last time we checked.
4035                  *
4036                  * However, if he sends a zero current size,
4037                  * take his (user-capped or) backing disk size anyways.
4038                  */
4039                 drbd_reconsider_queue_parameters(device, NULL, o);
4040                 drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
4041         }
4042
4043         if (get_ldev(device)) {
4044                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4045                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4046                         ldsc = 1;
4047                 }
4048
4049                 put_ldev(device);
4050         }
4051
4052         if (device->state.conn > C_WF_REPORT_PARAMS) {
4053                 if (be64_to_cpu(p->c_size) !=
4054                     drbd_get_capacity(device->this_bdev) || ldsc) {
4055                         /* we have different sizes, probably peer
4056                          * needs to know my new size... */
4057                         drbd_send_sizes(peer_device, 0, ddsf);
4058                 }
4059                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4060                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4061                         if (device->state.pdsk >= D_INCONSISTENT &&
4062                             device->state.disk >= D_INCONSISTENT) {
4063                                 if (ddsf & DDSF_NO_RESYNC)
4064                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4065                                 else
4066                                         resync_after_online_grow(device);
4067                         } else
4068                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
4069                 }
4070         }
4071
4072         return 0;
4073 }
4074
4075 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4076 {
4077         struct drbd_peer_device *peer_device;
4078         struct drbd_device *device;
4079         struct p_uuids *p = pi->data;
4080         u64 *p_uuid;
4081         int i, updated_uuids = 0;
4082
4083         peer_device = conn_peer_device(connection, pi->vnr);
4084         if (!peer_device)
4085                 return config_unknown_volume(connection, pi);
4086         device = peer_device->device;
4087
4088         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
4089         if (!p_uuid) {
4090                 drbd_err(device, "kmalloc of p_uuid failed\n");
4091                 return false;
4092         }
4093
4094         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4095                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
4096
4097         kfree(device->p_uuid);
4098         device->p_uuid = p_uuid;
4099
4100         if (device->state.conn < C_CONNECTED &&
4101             device->state.disk < D_INCONSISTENT &&
4102             device->state.role == R_PRIMARY &&
4103             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4104                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4105                     (unsigned long long)device->ed_uuid);
4106                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4107                 return -EIO;
4108         }
4109
4110         if (get_ldev(device)) {
4111                 int skip_initial_sync =
4112                         device->state.conn == C_CONNECTED &&
4113                         peer_device->connection->agreed_pro_version >= 90 &&
4114                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4115                         (p_uuid[UI_FLAGS] & 8);
4116                 if (skip_initial_sync) {
4117                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4118                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4119                                         "clear_n_write from receive_uuids",
4120                                         BM_LOCKED_TEST_ALLOWED);
4121                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4122                         _drbd_uuid_set(device, UI_BITMAP, 0);
4123                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4124                                         CS_VERBOSE, NULL);
4125                         drbd_md_sync(device);
4126                         updated_uuids = 1;
4127                 }
4128                 put_ldev(device);
4129         } else if (device->state.disk < D_INCONSISTENT &&
4130                    device->state.role == R_PRIMARY) {
4131                 /* I am a diskless primary, the peer just created a new current UUID
4132                    for me. */
4133                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4134         }
4135
4136         /* Before we test for the disk state, we should wait until an eventually
4137            ongoing cluster wide state change is finished. That is important if
4138            we are primary and are detaching from our disk. We need to see the
4139            new disk state... */
4140         mutex_lock(device->state_mutex);
4141         mutex_unlock(device->state_mutex);
4142         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4143                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4144
4145         if (updated_uuids)
4146                 drbd_print_uuids(device, "receiver updated UUIDs to");
4147
4148         return 0;
4149 }
4150
4151 /**
4152  * convert_state() - Converts the peer's view of the cluster state to our point of view
4153  * @ps:         The state as seen by the peer.
4154  */
4155 static union drbd_state convert_state(union drbd_state ps)
4156 {
4157         union drbd_state ms;
4158
4159         static enum drbd_conns c_tab[] = {
4160                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4161                 [C_CONNECTED] = C_CONNECTED,
4162
4163                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4164                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4165                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4166                 [C_VERIFY_S]       = C_VERIFY_T,
4167                 [C_MASK]   = C_MASK,
4168         };
4169
4170         ms.i = ps.i;
4171
4172         ms.conn = c_tab[ps.conn];
4173         ms.peer = ps.role;
4174         ms.role = ps.peer;
4175         ms.pdsk = ps.disk;
4176         ms.disk = ps.pdsk;
4177         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4178
4179         return ms;
4180 }
4181
4182 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4183 {
4184         struct drbd_peer_device *peer_device;
4185         struct drbd_device *device;
4186         struct p_req_state *p = pi->data;
4187         union drbd_state mask, val;
4188         enum drbd_state_rv rv;
4189
4190         peer_device = conn_peer_device(connection, pi->vnr);
4191         if (!peer_device)
4192                 return -EIO;
4193         device = peer_device->device;
4194
4195         mask.i = be32_to_cpu(p->mask);
4196         val.i = be32_to_cpu(p->val);
4197
4198         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4199             mutex_is_locked(device->state_mutex)) {
4200                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4201                 return 0;
4202         }
4203
4204         mask = convert_state(mask);
4205         val = convert_state(val);
4206
4207         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4208         drbd_send_sr_reply(peer_device, rv);
4209
4210         drbd_md_sync(device);
4211
4212         return 0;
4213 }
4214
4215 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4216 {
4217         struct p_req_state *p = pi->data;
4218         union drbd_state mask, val;
4219         enum drbd_state_rv rv;
4220
4221         mask.i = be32_to_cpu(p->mask);
4222         val.i = be32_to_cpu(p->val);
4223
4224         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4225             mutex_is_locked(&connection->cstate_mutex)) {
4226                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4227                 return 0;
4228         }
4229
4230         mask = convert_state(mask);
4231         val = convert_state(val);
4232
4233         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4234         conn_send_sr_reply(connection, rv);
4235
4236         return 0;
4237 }
4238
4239 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4240 {
4241         struct drbd_peer_device *peer_device;
4242         struct drbd_device *device;
4243         struct p_state *p = pi->data;
4244         union drbd_state os, ns, peer_state;
4245         enum drbd_disk_state real_peer_disk;
4246         enum chg_state_flags cs_flags;
4247         int rv;
4248
4249         peer_device = conn_peer_device(connection, pi->vnr);
4250         if (!peer_device)
4251                 return config_unknown_volume(connection, pi);
4252         device = peer_device->device;
4253
4254         peer_state.i = be32_to_cpu(p->state);
4255
4256         real_peer_disk = peer_state.disk;
4257         if (peer_state.disk == D_NEGOTIATING) {
4258                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4259                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4260         }
4261
4262         spin_lock_irq(&device->resource->req_lock);
4263  retry:
4264         os = ns = drbd_read_state(device);
4265         spin_unlock_irq(&device->resource->req_lock);
4266
4267         /* If some other part of the code (ack_receiver thread, timeout)
4268          * already decided to close the connection again,
4269          * we must not "re-establish" it here. */
4270         if (os.conn <= C_TEAR_DOWN)
4271                 return -ECONNRESET;
4272
4273         /* If this is the "end of sync" confirmation, usually the peer disk
4274          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4275          * set) resync started in PausedSyncT, or if the timing of pause-/
4276          * unpause-sync events has been "just right", the peer disk may
4277          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4278          */
4279         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4280             real_peer_disk == D_UP_TO_DATE &&
4281             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4282                 /* If we are (becoming) SyncSource, but peer is still in sync
4283                  * preparation, ignore its uptodate-ness to avoid flapping, it
4284                  * will change to inconsistent once the peer reaches active
4285                  * syncing states.
4286                  * It may have changed syncer-paused flags, however, so we
4287                  * cannot ignore this completely. */
4288                 if (peer_state.conn > C_CONNECTED &&
4289                     peer_state.conn < C_SYNC_SOURCE)
4290                         real_peer_disk = D_INCONSISTENT;
4291
4292                 /* if peer_state changes to connected at the same time,
4293                  * it explicitly notifies us that it finished resync.
4294                  * Maybe we should finish it up, too? */
4295                 else if (os.conn >= C_SYNC_SOURCE &&
4296                          peer_state.conn == C_CONNECTED) {
4297                         if (drbd_bm_total_weight(device) <= device->rs_failed)
4298                                 drbd_resync_finished(device);
4299                         return 0;
4300                 }
4301         }
4302
4303         /* explicit verify finished notification, stop sector reached. */
4304         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4305             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4306                 ov_out_of_sync_print(device);
4307                 drbd_resync_finished(device);
4308                 return 0;
4309         }
4310
4311         /* peer says his disk is inconsistent, while we think it is uptodate,
4312          * and this happens while the peer still thinks we have a sync going on,
4313          * but we think we are already done with the sync.
4314          * We ignore this to avoid flapping pdsk.
4315          * This should not happen, if the peer is a recent version of drbd. */
4316         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4317             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4318                 real_peer_disk = D_UP_TO_DATE;
4319
4320         if (ns.conn == C_WF_REPORT_PARAMS)
4321                 ns.conn = C_CONNECTED;
4322
4323         if (peer_state.conn == C_AHEAD)
4324                 ns.conn = C_BEHIND;
4325
4326         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4327             get_ldev_if_state(device, D_NEGOTIATING)) {
4328                 int cr; /* consider resync */
4329
4330                 /* if we established a new connection */
4331                 cr  = (os.conn < C_CONNECTED);
4332                 /* if we had an established connection
4333                  * and one of the nodes newly attaches a disk */
4334                 cr |= (os.conn == C_CONNECTED &&
4335                        (peer_state.disk == D_NEGOTIATING ||
4336                         os.disk == D_NEGOTIATING));
4337                 /* if we have both been inconsistent, and the peer has been
4338                  * forced to be UpToDate with --overwrite-data */
4339                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4340                 /* if we had been plain connected, and the admin requested to
4341                  * start a sync by "invalidate" or "invalidate-remote" */
4342                 cr |= (os.conn == C_CONNECTED &&
4343                                 (peer_state.conn >= C_STARTING_SYNC_S &&
4344                                  peer_state.conn <= C_WF_BITMAP_T));
4345
4346                 if (cr)
4347                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4348
4349                 put_ldev(device);
4350                 if (ns.conn == C_MASK) {
4351                         ns.conn = C_CONNECTED;
4352                         if (device->state.disk == D_NEGOTIATING) {
4353                                 drbd_force_state(device, NS(disk, D_FAILED));
4354                         } else if (peer_state.disk == D_NEGOTIATING) {
4355                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4356                                 peer_state.disk = D_DISKLESS;
4357                                 real_peer_disk = D_DISKLESS;
4358                         } else {
4359                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4360                                         return -EIO;
4361                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4362                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4363                                 return -EIO;
4364                         }
4365                 }
4366         }
4367
4368         spin_lock_irq(&device->resource->req_lock);
4369         if (os.i != drbd_read_state(device).i)
4370                 goto retry;
4371         clear_bit(CONSIDER_RESYNC, &device->flags);
4372         ns.peer = peer_state.role;
4373         ns.pdsk = real_peer_disk;
4374         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4375         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4376                 ns.disk = device->new_state_tmp.disk;
4377         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4378         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4379             test_bit(NEW_CUR_UUID, &device->flags)) {
4380                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4381                    for temporal network outages! */
4382                 spin_unlock_irq(&device->resource->req_lock);
4383                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4384                 tl_clear(peer_device->connection);
4385                 drbd_uuid_new_current(device);
4386                 clear_bit(NEW_CUR_UUID, &device->flags);
4387                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4388                 return -EIO;
4389         }
4390         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4391         ns = drbd_read_state(device);
4392         spin_unlock_irq(&device->resource->req_lock);
4393
4394         if (rv < SS_SUCCESS) {
4395                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4396                 return -EIO;
4397         }
4398
4399         if (os.conn > C_WF_REPORT_PARAMS) {
4400                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4401                     peer_state.disk != D_NEGOTIATING ) {
4402                         /* we want resync, peer has not yet decided to sync... */
4403                         /* Nowadays only used when forcing a node into primary role and
4404                            setting its disk to UpToDate with that */
4405                         drbd_send_uuids(peer_device);
4406                         drbd_send_current_state(peer_device);
4407                 }
4408         }
4409
4410         clear_bit(DISCARD_MY_DATA, &device->flags);
4411
4412         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4413
4414         return 0;
4415 }
4416
4417 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4418 {
4419         struct drbd_peer_device *peer_device;
4420         struct drbd_device *device;
4421         struct p_rs_uuid *p = pi->data;
4422
4423         peer_device = conn_peer_device(connection, pi->vnr);
4424         if (!peer_device)
4425                 return -EIO;
4426         device = peer_device->device;
4427
4428         wait_event(device->misc_wait,
4429                    device->state.conn == C_WF_SYNC_UUID ||
4430                    device->state.conn == C_BEHIND ||
4431                    device->state.conn < C_CONNECTED ||
4432                    device->state.disk < D_NEGOTIATING);
4433
4434         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4435
4436         /* Here the _drbd_uuid_ functions are right, current should
4437            _not_ be rotated into the history */
4438         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4439                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4440                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4441
4442                 drbd_print_uuids(device, "updated sync uuid");
4443                 drbd_start_resync(device, C_SYNC_TARGET);
4444
4445                 put_ldev(device);
4446         } else
4447                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4448
4449         return 0;
4450 }
4451
4452 /**
4453  * receive_bitmap_plain
4454  *
4455  * Return 0 when done, 1 when another iteration is needed, and a negative error
4456  * code upon failure.
4457  */
4458 static int
4459 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4460                      unsigned long *p, struct bm_xfer_ctx *c)
4461 {
4462         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4463                                  drbd_header_size(peer_device->connection);
4464         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4465                                        c->bm_words - c->word_offset);
4466         unsigned int want = num_words * sizeof(*p);
4467         int err;
4468
4469         if (want != size) {
4470                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4471                 return -EIO;
4472         }
4473         if (want == 0)
4474                 return 0;
4475         err = drbd_recv_all(peer_device->connection, p, want);
4476         if (err)
4477                 return err;
4478
4479         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4480
4481         c->word_offset += num_words;
4482         c->bit_offset = c->word_offset * BITS_PER_LONG;
4483         if (c->bit_offset > c->bm_bits)
4484                 c->bit_offset = c->bm_bits;
4485
4486         return 1;
4487 }
4488
4489 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4490 {
4491         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4492 }
4493
4494 static int dcbp_get_start(struct p_compressed_bm *p)
4495 {
4496         return (p->encoding & 0x80) != 0;
4497 }
4498
4499 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4500 {
4501         return (p->encoding >> 4) & 0x7;
4502 }
4503
4504 /**
4505  * recv_bm_rle_bits
4506  *
4507  * Return 0 when done, 1 when another iteration is needed, and a negative error
4508  * code upon failure.
4509  */
4510 static int
4511 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4512                 struct p_compressed_bm *p,
4513                  struct bm_xfer_ctx *c,
4514                  unsigned int len)
4515 {
4516         struct bitstream bs;
4517         u64 look_ahead;
4518         u64 rl;
4519         u64 tmp;
4520         unsigned long s = c->bit_offset;
4521         unsigned long e;
4522         int toggle = dcbp_get_start(p);
4523         int have;
4524         int bits;
4525
4526         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4527
4528         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4529         if (bits < 0)
4530                 return -EIO;
4531
4532         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4533                 bits = vli_decode_bits(&rl, look_ahead);
4534                 if (bits <= 0)
4535                         return -EIO;
4536
4537                 if (toggle) {
4538                         e = s + rl -1;
4539                         if (e >= c->bm_bits) {
4540                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4541                                 return -EIO;
4542                         }
4543                         _drbd_bm_set_bits(peer_device->device, s, e);
4544                 }
4545
4546                 if (have < bits) {
4547                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4548                                 have, bits, look_ahead,
4549                                 (unsigned int)(bs.cur.b - p->code),
4550                                 (unsigned int)bs.buf_len);
4551                         return -EIO;
4552                 }
4553                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4554                 if (likely(bits < 64))
4555                         look_ahead >>= bits;
4556                 else
4557                         look_ahead = 0;
4558                 have -= bits;
4559
4560                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4561                 if (bits < 0)
4562                         return -EIO;
4563                 look_ahead |= tmp << have;
4564                 have += bits;
4565         }
4566
4567         c->bit_offset = s;
4568         bm_xfer_ctx_bit_to_word_offset(c);
4569
4570         return (s != c->bm_bits);
4571 }
4572
4573 /**
4574  * decode_bitmap_c
4575  *
4576  * Return 0 when done, 1 when another iteration is needed, and a negative error
4577  * code upon failure.
4578  */
4579 static int
4580 decode_bitmap_c(struct drbd_peer_device *peer_device,
4581                 struct p_compressed_bm *p,
4582                 struct bm_xfer_ctx *c,
4583                 unsigned int len)
4584 {
4585         if (dcbp_get_code(p) == RLE_VLI_Bits)
4586                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4587
4588         /* other variants had been implemented for evaluation,
4589          * but have been dropped as this one turned out to be "best"
4590          * during all our tests. */
4591
4592         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4593         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4594         return -EIO;
4595 }
4596
4597 void INFO_bm_xfer_stats(struct drbd_device *device,
4598                 const char *direction, struct bm_xfer_ctx *c)
4599 {
4600         /* what would it take to transfer it "plaintext" */
4601         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4602         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4603         unsigned int plain =
4604                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4605                 c->bm_words * sizeof(unsigned long);
4606         unsigned int total = c->bytes[0] + c->bytes[1];
4607         unsigned int r;
4608
4609         /* total can not be zero. but just in case: */
4610         if (total == 0)
4611                 return;
4612
4613         /* don't report if not compressed */
4614         if (total >= plain)
4615                 return;
4616
4617         /* total < plain. check for overflow, still */
4618         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4619                                     : (1000 * total / plain);
4620
4621         if (r > 1000)
4622                 r = 1000;
4623
4624         r = 1000 - r;
4625         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4626              "total %u; compression: %u.%u%%\n",
4627                         direction,
4628                         c->bytes[1], c->packets[1],
4629                         c->bytes[0], c->packets[0],
4630                         total, r/10, r % 10);
4631 }
4632
4633 /* Since we are processing the bitfield from lower addresses to higher,
4634    it does not matter if the process it in 32 bit chunks or 64 bit
4635    chunks as long as it is little endian. (Understand it as byte stream,
4636    beginning with the lowest byte...) If we would use big endian
4637    we would need to process it from the highest address to the lowest,
4638    in order to be agnostic to the 32 vs 64 bits issue.
4639
4640    returns 0 on failure, 1 if we successfully received it. */
4641 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4642 {
4643         struct drbd_peer_device *peer_device;
4644         struct drbd_device *device;
4645         struct bm_xfer_ctx c;
4646         int err;
4647
4648         peer_device = conn_peer_device(connection, pi->vnr);
4649         if (!peer_device)
4650                 return -EIO;
4651         device = peer_device->device;
4652
4653         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4654         /* you are supposed to send additional out-of-sync information
4655          * if you actually set bits during this phase */
4656
4657         c = (struct bm_xfer_ctx) {
4658                 .bm_bits = drbd_bm_bits(device),
4659                 .bm_words = drbd_bm_words(device),
4660         };
4661
4662         for(;;) {
4663                 if (pi->cmd == P_BITMAP)
4664                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4665                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4666                         /* MAYBE: sanity check that we speak proto >= 90,
4667                          * and the feature is enabled! */
4668                         struct p_compressed_bm *p = pi->data;
4669
4670                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4671                                 drbd_err(device, "ReportCBitmap packet too large\n");
4672                                 err = -EIO;
4673                                 goto out;
4674                         }
4675                         if (pi->size <= sizeof(*p)) {
4676                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4677                                 err = -EIO;
4678                                 goto out;
4679                         }
4680                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4681                         if (err)
4682                                goto out;
4683                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4684                 } else {
4685                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4686                         err = -EIO;
4687                         goto out;
4688                 }
4689
4690                 c.packets[pi->cmd == P_BITMAP]++;
4691                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4692
4693                 if (err <= 0) {
4694                         if (err < 0)
4695                                 goto out;
4696                         break;
4697                 }
4698                 err = drbd_recv_header(peer_device->connection, pi);
4699                 if (err)
4700                         goto out;
4701         }
4702
4703         INFO_bm_xfer_stats(device, "receive", &c);
4704
4705         if (device->state.conn == C_WF_BITMAP_T) {
4706                 enum drbd_state_rv rv;
4707
4708                 err = drbd_send_bitmap(device);
4709                 if (err)
4710                         goto out;
4711                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4712                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4713                 D_ASSERT(device, rv == SS_SUCCESS);
4714         } else if (device->state.conn != C_WF_BITMAP_S) {
4715                 /* admin may have requested C_DISCONNECTING,
4716                  * other threads may have noticed network errors */
4717                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4718                     drbd_conn_str(device->state.conn));
4719         }
4720         err = 0;
4721
4722  out:
4723         drbd_bm_unlock(device);
4724         if (!err && device->state.conn == C_WF_BITMAP_S)
4725                 drbd_start_resync(device, C_SYNC_SOURCE);
4726         return err;
4727 }
4728
4729 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4730 {
4731         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4732                  pi->cmd, pi->size);
4733
4734         return ignore_remaining_packet(connection, pi);
4735 }
4736
4737 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4738 {
4739         /* Make sure we've acked all the TCP data associated
4740          * with the data requests being unplugged */
4741         drbd_tcp_quickack(connection->data.socket);
4742
4743         return 0;
4744 }
4745
4746 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4747 {
4748         struct drbd_peer_device *peer_device;
4749         struct drbd_device *device;
4750         struct p_block_desc *p = pi->data;
4751
4752         peer_device = conn_peer_device(connection, pi->vnr);
4753         if (!peer_device)
4754                 return -EIO;
4755         device = peer_device->device;
4756
4757         switch (device->state.conn) {
4758         case C_WF_SYNC_UUID:
4759         case C_WF_BITMAP_T:
4760         case C_BEHIND:
4761                         break;
4762         default:
4763                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4764                                 drbd_conn_str(device->state.conn));
4765         }
4766
4767         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4768
4769         return 0;
4770 }
4771
4772 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4773 {
4774         struct drbd_peer_device *peer_device;
4775         struct p_block_desc *p = pi->data;
4776         struct drbd_device *device;
4777         sector_t sector;
4778         int size, err = 0;
4779
4780         peer_device = conn_peer_device(connection, pi->vnr);
4781         if (!peer_device)
4782                 return -EIO;
4783         device = peer_device->device;
4784
4785         sector = be64_to_cpu(p->sector);
4786         size = be32_to_cpu(p->blksize);
4787
4788         dec_rs_pending(device);
4789
4790         if (get_ldev(device)) {
4791                 struct drbd_peer_request *peer_req;
4792                 const int op = REQ_OP_WRITE_ZEROES;
4793
4794                 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4795                                                size, 0, GFP_NOIO);
4796                 if (!peer_req) {
4797                         put_ldev(device);
4798                         return -ENOMEM;
4799                 }
4800
4801                 peer_req->w.cb = e_end_resync_block;
4802                 peer_req->submit_jif = jiffies;
4803                 peer_req->flags |= EE_IS_TRIM;
4804
4805                 spin_lock_irq(&device->resource->req_lock);
4806                 list_add_tail(&peer_req->w.list, &device->sync_ee);
4807                 spin_unlock_irq(&device->resource->req_lock);
4808
4809                 atomic_add(pi->size >> 9, &device->rs_sect_ev);
4810                 err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
4811
4812                 if (err) {
4813                         spin_lock_irq(&device->resource->req_lock);
4814                         list_del(&peer_req->w.list);
4815                         spin_unlock_irq(&device->resource->req_lock);
4816
4817                         drbd_free_peer_req(device, peer_req);
4818                         put_ldev(device);
4819                         err = 0;
4820                         goto fail;
4821                 }
4822
4823                 inc_unacked(device);
4824
4825                 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4826                    as well as drbd_rs_complete_io() */
4827         } else {
4828         fail:
4829                 drbd_rs_complete_io(device, sector);
4830                 drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4831         }
4832
4833         atomic_add(size >> 9, &device->rs_sect_in);
4834
4835         return err;
4836 }
4837
4838 struct data_cmd {
4839         int expect_payload;
4840         unsigned int pkt_size;
4841         int (*fn)(struct drbd_connection *, struct packet_info *);
4842 };
4843
4844 static struct data_cmd drbd_cmd_handler[] = {
4845         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4846         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4847         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4848         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4849         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4850         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4851         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4852         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4853         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4854         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4855         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4856         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4857         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4858         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4859         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4860         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4861         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4862         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4863         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4864         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4865         [P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
4866         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4867         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4868         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4869         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4870         [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
4871         [P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4872         [P_WSAME]           = { 1, sizeof(struct p_wsame), receive_Data },
4873 };
4874
4875 static void drbdd(struct drbd_connection *connection)
4876 {
4877         struct packet_info pi;
4878         size_t shs; /* sub header size */
4879         int err;
4880
4881         while (get_t_state(&connection->receiver) == RUNNING) {
4882                 struct data_cmd const *cmd;
4883
4884                 drbd_thread_current_set_cpu(&connection->receiver);
4885                 update_receiver_timing_details(connection, drbd_recv_header);
4886                 if (drbd_recv_header(connection, &pi))
4887                         goto err_out;
4888
4889                 cmd = &drbd_cmd_handler[pi.cmd];
4890                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4891                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4892                                  cmdname(pi.cmd), pi.cmd);
4893                         goto err_out;
4894                 }
4895
4896                 shs = cmd->pkt_size;
4897                 if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4898                         shs += sizeof(struct o_qlim);
4899                 if (pi.size > shs && !cmd->expect_payload) {
4900                         drbd_err(connection, "No payload expected %s l:%d\n",
4901                                  cmdname(pi.cmd), pi.size);
4902                         goto err_out;
4903                 }
4904                 if (pi.size < shs) {
4905                         drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
4906                                  cmdname(pi.cmd), (int)shs, pi.size);
4907                         goto err_out;
4908                 }
4909
4910                 if (shs) {
4911                         update_receiver_timing_details(connection, drbd_recv_all_warn);
4912                         err = drbd_recv_all_warn(connection, pi.data, shs);
4913                         if (err)
4914                                 goto err_out;
4915                         pi.size -= shs;
4916                 }
4917
4918                 update_receiver_timing_details(connection, cmd->fn);
4919                 err = cmd->fn(connection, &pi);
4920                 if (err) {
4921                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4922                                  cmdname(pi.cmd), err, pi.size);
4923                         goto err_out;
4924                 }
4925         }
4926         return;
4927
4928     err_out:
4929         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4930 }
4931
4932 static void conn_disconnect(struct drbd_connection *connection)
4933 {
4934         struct drbd_peer_device *peer_device;
4935         enum drbd_conns oc;
4936         int vnr;
4937
4938         if (connection->cstate == C_STANDALONE)
4939                 return;
4940
4941         /* We are about to start the cleanup after connection loss.
4942          * Make sure drbd_make_request knows about that.
4943          * Usually we should be in some network failure state already,
4944          * but just in case we are not, we fix it up here.
4945          */
4946         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4947
4948         /* ack_receiver does not clean up anything. it must not interfere, either */
4949         drbd_thread_stop(&connection->ack_receiver);
4950         if (connection->ack_sender) {
4951                 destroy_workqueue(connection->ack_sender);
4952                 connection->ack_sender = NULL;
4953         }
4954         drbd_free_sock(connection);
4955
4956         rcu_read_lock();
4957         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4958                 struct drbd_device *device = peer_device->device;
4959                 kref_get(&device->kref);
4960                 rcu_read_unlock();
4961                 drbd_disconnected(peer_device);
4962                 kref_put(&device->kref, drbd_destroy_device);
4963                 rcu_read_lock();
4964         }
4965         rcu_read_unlock();
4966
4967         if (!list_empty(&connection->current_epoch->list))
4968                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4969         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4970         atomic_set(&connection->current_epoch->epoch_size, 0);
4971         connection->send.seen_any_write_yet = false;
4972
4973         drbd_info(connection, "Connection closed\n");
4974
4975         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4976                 conn_try_outdate_peer_async(connection);
4977
4978         spin_lock_irq(&connection->resource->req_lock);
4979         oc = connection->cstate;
4980         if (oc >= C_UNCONNECTED)
4981                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4982
4983         spin_unlock_irq(&connection->resource->req_lock);
4984
4985         if (oc == C_DISCONNECTING)
4986                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4987 }
4988
4989 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4990 {
4991         struct drbd_device *device = peer_device->device;
4992         unsigned int i;
4993
4994         /* wait for current activity to cease. */
4995         spin_lock_irq(&device->resource->req_lock);
4996         _drbd_wait_ee_list_empty(device, &device->active_ee);
4997         _drbd_wait_ee_list_empty(device, &device->sync_ee);
4998         _drbd_wait_ee_list_empty(device, &device->read_ee);
4999         spin_unlock_irq(&device->resource->req_lock);
5000
5001         /* We do not have data structures that would allow us to
5002          * get the rs_pending_cnt down to 0 again.
5003          *  * On C_SYNC_TARGET we do not have any data structures describing
5004          *    the pending RSDataRequest's we have sent.
5005          *  * On C_SYNC_SOURCE there is no data structure that tracks
5006          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5007          *  And no, it is not the sum of the reference counts in the
5008          *  resync_LRU. The resync_LRU tracks the whole operation including
5009          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5010          *  on the fly. */
5011         drbd_rs_cancel_all(device);
5012         device->rs_total = 0;
5013         device->rs_failed = 0;
5014         atomic_set(&device->rs_pending_cnt, 0);
5015         wake_up(&device->misc_wait);
5016
5017         del_timer_sync(&device->resync_timer);
5018         resync_timer_fn((unsigned long)device);
5019
5020         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5021          * w_make_resync_request etc. which may still be on the worker queue
5022          * to be "canceled" */
5023         drbd_flush_workqueue(&peer_device->connection->sender_work);
5024
5025         drbd_finish_peer_reqs(device);
5026
5027         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5028            might have issued a work again. The one before drbd_finish_peer_reqs() is
5029            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5030         drbd_flush_workqueue(&peer_device->connection->sender_work);
5031
5032         /* need to do it again, drbd_finish_peer_reqs() may have populated it
5033          * again via drbd_try_clear_on_disk_bm(). */
5034         drbd_rs_cancel_all(device);
5035
5036         kfree(device->p_uuid);
5037         device->p_uuid = NULL;
5038
5039         if (!drbd_suspended(device))
5040                 tl_clear(peer_device->connection);
5041
5042         drbd_md_sync(device);
5043
5044         if (get_ldev(device)) {
5045                 drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5046                                 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5047                 put_ldev(device);
5048         }
5049
5050         /* tcp_close and release of sendpage pages can be deferred.  I don't
5051          * want to use SO_LINGER, because apparently it can be deferred for
5052          * more than 20 seconds (longest time I checked).
5053          *
5054          * Actually we don't care for exactly when the network stack does its
5055          * put_page(), but release our reference on these pages right here.
5056          */
5057         i = drbd_free_peer_reqs(device, &device->net_ee);
5058         if (i)
5059                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5060         i = atomic_read(&device->pp_in_use_by_net);
5061         if (i)
5062                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5063         i = atomic_read(&device->pp_in_use);
5064         if (i)
5065                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5066
5067         D_ASSERT(device, list_empty(&device->read_ee));
5068         D_ASSERT(device, list_empty(&device->active_ee));
5069         D_ASSERT(device, list_empty(&device->sync_ee));
5070         D_ASSERT(device, list_empty(&device->done_ee));
5071
5072         return 0;
5073 }
5074
5075 /*
5076  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5077  * we can agree on is stored in agreed_pro_version.
5078  *
5079  * feature flags and the reserved array should be enough room for future
5080  * enhancements of the handshake protocol, and possible plugins...
5081  *
5082  * for now, they are expected to be zero, but ignored.
5083  */
5084 static int drbd_send_features(struct drbd_connection *connection)
5085 {
5086         struct drbd_socket *sock;
5087         struct p_connection_features *p;
5088
5089         sock = &connection->data;
5090         p = conn_prepare_command(connection, sock);
5091         if (!p)
5092                 return -EIO;
5093         memset(p, 0, sizeof(*p));
5094         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5095         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5096         p->feature_flags = cpu_to_be32(PRO_FEATURES);
5097         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5098 }
5099
5100 /*
5101  * return values:
5102  *   1 yes, we have a valid connection
5103  *   0 oops, did not work out, please try again
5104  *  -1 peer talks different language,
5105  *     no point in trying again, please go standalone.
5106  */
5107 static int drbd_do_features(struct drbd_connection *connection)
5108 {
5109         /* ASSERT current == connection->receiver ... */
5110         struct p_connection_features *p;
5111         const int expect = sizeof(struct p_connection_features);
5112         struct packet_info pi;
5113         int err;
5114
5115         err = drbd_send_features(connection);
5116         if (err)
5117                 return 0;
5118
5119         err = drbd_recv_header(connection, &pi);
5120         if (err)
5121                 return 0;
5122
5123         if (pi.cmd != P_CONNECTION_FEATURES) {
5124                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5125                          cmdname(pi.cmd), pi.cmd);
5126                 return -1;
5127         }
5128
5129         if (pi.size != expect) {
5130                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5131                      expect, pi.size);
5132                 return -1;
5133         }
5134
5135         p = pi.data;
5136         err = drbd_recv_all_warn(connection, p, expect);
5137         if (err)
5138                 return 0;
5139
5140         p->protocol_min = be32_to_cpu(p->protocol_min);
5141         p->protocol_max = be32_to_cpu(p->protocol_max);
5142         if (p->protocol_max == 0)
5143                 p->protocol_max = p->protocol_min;
5144
5145         if (PRO_VERSION_MAX < p->protocol_min ||
5146             PRO_VERSION_MIN > p->protocol_max)
5147                 goto incompat;
5148
5149         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5150         connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5151
5152         drbd_info(connection, "Handshake successful: "
5153              "Agreed network protocol version %d\n", connection->agreed_pro_version);
5154
5155         drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n",
5156                   connection->agreed_features,
5157                   connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5158                   connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5159                   connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" :
5160                   connection->agreed_features ? "" : " none");
5161
5162         return 1;
5163
5164  incompat:
5165         drbd_err(connection, "incompatible DRBD dialects: "
5166             "I support %d-%d, peer supports %d-%d\n",
5167             PRO_VERSION_MIN, PRO_VERSION_MAX,
5168             p->protocol_min, p->protocol_max);
5169         return -1;
5170 }
5171
5172 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5173 static int drbd_do_auth(struct drbd_connection *connection)
5174 {
5175         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5176         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5177         return -1;
5178 }
5179 #else
5180 #define CHALLENGE_LEN 64
5181
5182 /* Return value:
5183         1 - auth succeeded,
5184         0 - failed, try again (network error),
5185         -1 - auth failed, don't try again.
5186 */
5187
5188 static int drbd_do_auth(struct drbd_connection *connection)
5189 {
5190         struct drbd_socket *sock;
5191         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5192         char *response = NULL;
5193         char *right_response = NULL;
5194         char *peers_ch = NULL;
5195         unsigned int key_len;
5196         char secret[SHARED_SECRET_MAX]; /* 64 byte */
5197         unsigned int resp_size;
5198         SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
5199         struct packet_info pi;
5200         struct net_conf *nc;
5201         int err, rv;
5202
5203         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5204
5205         rcu_read_lock();
5206         nc = rcu_dereference(connection->net_conf);
5207         key_len = strlen(nc->shared_secret);
5208         memcpy(secret, nc->shared_secret, key_len);
5209         rcu_read_unlock();
5210
5211         desc->tfm = connection->cram_hmac_tfm;
5212         desc->flags = 0;
5213
5214         rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5215         if (rv) {
5216                 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5217                 rv = -1;
5218                 goto fail;
5219         }
5220
5221         get_random_bytes(my_challenge, CHALLENGE_LEN);
5222
5223         sock = &connection->data;
5224         if (!conn_prepare_command(connection, sock)) {
5225                 rv = 0;
5226                 goto fail;
5227         }
5228         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5229                                 my_challenge, CHALLENGE_LEN);
5230         if (!rv)
5231                 goto fail;
5232
5233         err = drbd_recv_header(connection, &pi);
5234         if (err) {
5235                 rv = 0;
5236                 goto fail;
5237         }
5238
5239         if (pi.cmd != P_AUTH_CHALLENGE) {
5240                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5241                          cmdname(pi.cmd), pi.cmd);
5242                 rv = 0;
5243                 goto fail;
5244         }
5245
5246         if (pi.size > CHALLENGE_LEN * 2) {
5247                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
5248                 rv = -1;
5249                 goto fail;
5250         }
5251
5252         if (pi.size < CHALLENGE_LEN) {
5253                 drbd_err(connection, "AuthChallenge payload too small.\n");
5254                 rv = -1;
5255                 goto fail;
5256         }
5257
5258         peers_ch = kmalloc(pi.size, GFP_NOIO);
5259         if (peers_ch == NULL) {
5260                 drbd_err(connection, "kmalloc of peers_ch failed\n");
5261                 rv = -1;
5262                 goto fail;
5263         }
5264
5265         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5266         if (err) {
5267                 rv = 0;
5268                 goto fail;
5269         }
5270
5271         if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5272                 drbd_err(connection, "Peer presented the same challenge!\n");
5273                 rv = -1;
5274                 goto fail;
5275         }
5276
5277         resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5278         response = kmalloc(resp_size, GFP_NOIO);
5279         if (response == NULL) {
5280                 drbd_err(connection, "kmalloc of response failed\n");
5281                 rv = -1;
5282                 goto fail;
5283         }
5284
5285         rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5286         if (rv) {
5287                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5288                 rv = -1;
5289                 goto fail;
5290         }
5291
5292         if (!conn_prepare_command(connection, sock)) {
5293                 rv = 0;
5294                 goto fail;
5295         }
5296         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5297                                 response, resp_size);
5298         if (!rv)
5299                 goto fail;
5300
5301         err = drbd_recv_header(connection, &pi);
5302         if (err) {
5303                 rv = 0;
5304                 goto fail;
5305         }
5306
5307         if (pi.cmd != P_AUTH_RESPONSE) {
5308                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5309                          cmdname(pi.cmd), pi.cmd);
5310                 rv = 0;
5311                 goto fail;
5312         }
5313
5314         if (pi.size != resp_size) {
5315                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5316                 rv = 0;
5317                 goto fail;
5318         }
5319
5320         err = drbd_recv_all_warn(connection, response , resp_size);
5321         if (err) {
5322                 rv = 0;
5323                 goto fail;
5324         }
5325
5326         right_response = kmalloc(resp_size, GFP_NOIO);
5327         if (right_response == NULL) {
5328                 drbd_err(connection, "kmalloc of right_response failed\n");
5329                 rv = -1;
5330                 goto fail;
5331         }
5332
5333         rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5334                                  right_response);
5335         if (rv) {
5336                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5337                 rv = -1;
5338                 goto fail;
5339         }
5340
5341         rv = !memcmp(response, right_response, resp_size);
5342
5343         if (rv)
5344                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5345                      resp_size);
5346         else
5347                 rv = -1;
5348
5349  fail:
5350         kfree(peers_ch);
5351         kfree(response);
5352         kfree(right_response);
5353         shash_desc_zero(desc);
5354
5355         return rv;
5356 }
5357 #endif
5358
5359 int drbd_receiver(struct drbd_thread *thi)
5360 {
5361         struct drbd_connection *connection = thi->connection;
5362         int h;
5363
5364         drbd_info(connection, "receiver (re)started\n");
5365
5366         do {
5367                 h = conn_connect(connection);
5368                 if (h == 0) {
5369                         conn_disconnect(connection);
5370                         schedule_timeout_interruptible(HZ);
5371                 }
5372                 if (h == -1) {
5373                         drbd_warn(connection, "Discarding network configuration.\n");
5374                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5375                 }
5376         } while (h == 0);
5377
5378         if (h > 0)
5379                 drbdd(connection);
5380
5381         conn_disconnect(connection);
5382
5383         drbd_info(connection, "receiver terminated\n");
5384         return 0;
5385 }
5386
5387 /* ********* acknowledge sender ******** */
5388
5389 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5390 {
5391         struct p_req_state_reply *p = pi->data;
5392         int retcode = be32_to_cpu(p->retcode);
5393
5394         if (retcode >= SS_SUCCESS) {
5395                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5396         } else {
5397                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5398                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5399                          drbd_set_st_err_str(retcode), retcode);
5400         }
5401         wake_up(&connection->ping_wait);
5402
5403         return 0;
5404 }
5405
5406 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5407 {
5408         struct drbd_peer_device *peer_device;
5409         struct drbd_device *device;
5410         struct p_req_state_reply *p = pi->data;
5411         int retcode = be32_to_cpu(p->retcode);
5412
5413         peer_device = conn_peer_device(connection, pi->vnr);
5414         if (!peer_device)
5415                 return -EIO;
5416         device = peer_device->device;
5417
5418         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5419                 D_ASSERT(device, connection->agreed_pro_version < 100);
5420                 return got_conn_RqSReply(connection, pi);
5421         }
5422
5423         if (retcode >= SS_SUCCESS) {
5424                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5425         } else {
5426                 set_bit(CL_ST_CHG_FAIL, &device->flags);
5427                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5428                         drbd_set_st_err_str(retcode), retcode);
5429         }
5430         wake_up(&device->state_wait);
5431
5432         return 0;
5433 }
5434
5435 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5436 {
5437         return drbd_send_ping_ack(connection);
5438
5439 }
5440
5441 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5442 {
5443         /* restore idle timeout */
5444         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5445         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5446                 wake_up(&connection->ping_wait);
5447
5448         return 0;
5449 }
5450
5451 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5452 {
5453         struct drbd_peer_device *peer_device;
5454         struct drbd_device *device;
5455         struct p_block_ack *p = pi->data;
5456         sector_t sector = be64_to_cpu(p->sector);
5457         int blksize = be32_to_cpu(p->blksize);
5458
5459         peer_device = conn_peer_device(connection, pi->vnr);
5460         if (!peer_device)
5461                 return -EIO;
5462         device = peer_device->device;
5463
5464         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5465
5466         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5467
5468         if (get_ldev(device)) {
5469                 drbd_rs_complete_io(device, sector);
5470                 drbd_set_in_sync(device, sector, blksize);
5471                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5472                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5473                 put_ldev(device);
5474         }
5475         dec_rs_pending(device);
5476         atomic_add(blksize >> 9, &device->rs_sect_in);
5477
5478         return 0;
5479 }
5480
5481 static int
5482 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5483                               struct rb_root *root, const char *func,
5484                               enum drbd_req_event what, bool missing_ok)
5485 {
5486         struct drbd_request *req;
5487         struct bio_and_error m;
5488
5489         spin_lock_irq(&device->resource->req_lock);
5490         req = find_request(device, root, id, sector, missing_ok, func);
5491         if (unlikely(!req)) {
5492                 spin_unlock_irq(&device->resource->req_lock);
5493                 return -EIO;
5494         }
5495         __req_mod(req, what, &m);
5496         spin_unlock_irq(&device->resource->req_lock);
5497
5498         if (m.bio)
5499                 complete_master_bio(device, &m);
5500         return 0;
5501 }
5502
5503 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5504 {
5505         struct drbd_peer_device *peer_device;
5506         struct drbd_device *device;
5507         struct p_block_ack *p = pi->data;
5508         sector_t sector = be64_to_cpu(p->sector);
5509         int blksize = be32_to_cpu(p->blksize);
5510         enum drbd_req_event what;
5511
5512         peer_device = conn_peer_device(connection, pi->vnr);
5513         if (!peer_device)
5514                 return -EIO;
5515         device = peer_device->device;
5516
5517         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5518
5519         if (p->block_id == ID_SYNCER) {
5520                 drbd_set_in_sync(device, sector, blksize);
5521                 dec_rs_pending(device);
5522                 return 0;
5523         }
5524         switch (pi->cmd) {
5525         case P_RS_WRITE_ACK:
5526                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5527                 break;
5528         case P_WRITE_ACK:
5529                 what = WRITE_ACKED_BY_PEER;
5530                 break;
5531         case P_RECV_ACK:
5532                 what = RECV_ACKED_BY_PEER;
5533                 break;
5534         case P_SUPERSEDED:
5535                 what = CONFLICT_RESOLVED;
5536                 break;
5537         case P_RETRY_WRITE:
5538                 what = POSTPONE_WRITE;
5539                 break;
5540         default:
5541                 BUG();
5542         }
5543
5544         return validate_req_change_req_state(device, p->block_id, sector,
5545                                              &device->write_requests, __func__,
5546                                              what, false);
5547 }
5548
5549 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5550 {
5551         struct drbd_peer_device *peer_device;
5552         struct drbd_device *device;
5553         struct p_block_ack *p = pi->data;
5554         sector_t sector = be64_to_cpu(p->sector);
5555         int size = be32_to_cpu(p->blksize);
5556         int err;
5557
5558         peer_device = conn_peer_device(connection, pi->vnr);
5559         if (!peer_device)
5560                 return -EIO;
5561         device = peer_device->device;
5562
5563         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5564
5565         if (p->block_id == ID_SYNCER) {
5566                 dec_rs_pending(device);
5567                 drbd_rs_failed_io(device, sector, size);
5568                 return 0;
5569         }
5570
5571         err = validate_req_change_req_state(device, p->block_id, sector,
5572                                             &device->write_requests, __func__,
5573                                             NEG_ACKED, true);
5574         if (err) {
5575                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5576                    The master bio might already be completed, therefore the
5577                    request is no longer in the collision hash. */
5578                 /* In Protocol B we might already have got a P_RECV_ACK
5579                    but then get a P_NEG_ACK afterwards. */
5580                 drbd_set_out_of_sync(device, sector, size);
5581         }
5582         return 0;
5583 }
5584
5585 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5586 {
5587         struct drbd_peer_device *peer_device;
5588         struct drbd_device *device;
5589         struct p_block_ack *p = pi->data;
5590         sector_t sector = be64_to_cpu(p->sector);
5591
5592         peer_device = conn_peer_device(connection, pi->vnr);
5593         if (!peer_device)
5594                 return -EIO;
5595         device = peer_device->device;
5596
5597         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5598
5599         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5600             (unsigned long long)sector, be32_to_cpu(p->blksize));
5601
5602         return validate_req_change_req_state(device, p->block_id, sector,
5603                                              &device->read_requests, __func__,
5604                                              NEG_ACKED, false);
5605 }
5606
5607 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5608 {
5609         struct drbd_peer_device *peer_device;
5610         struct drbd_device *device;
5611         sector_t sector;
5612         int size;
5613         struct p_block_ack *p = pi->data;
5614
5615         peer_device = conn_peer_device(connection, pi->vnr);
5616         if (!peer_device)
5617                 return -EIO;
5618         device = peer_device->device;
5619
5620         sector = be64_to_cpu(p->sector);
5621         size = be32_to_cpu(p->blksize);
5622
5623         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5624
5625         dec_rs_pending(device);
5626
5627         if (get_ldev_if_state(device, D_FAILED)) {
5628                 drbd_rs_complete_io(device, sector);
5629                 switch (pi->cmd) {
5630                 case P_NEG_RS_DREPLY:
5631                         drbd_rs_failed_io(device, sector, size);
5632                 case P_RS_CANCEL:
5633                         break;
5634                 default:
5635                         BUG();
5636                 }
5637                 put_ldev(device);
5638         }
5639
5640         return 0;
5641 }
5642
5643 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5644 {
5645         struct p_barrier_ack *p = pi->data;
5646         struct drbd_peer_device *peer_device;
5647         int vnr;
5648
5649         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5650
5651         rcu_read_lock();
5652         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5653                 struct drbd_device *device = peer_device->device;
5654
5655                 if (device->state.conn == C_AHEAD &&
5656                     atomic_read(&device->ap_in_flight) == 0 &&
5657                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5658                         device->start_resync_timer.expires = jiffies + HZ;
5659                         add_timer(&device->start_resync_timer);
5660                 }
5661         }
5662         rcu_read_unlock();
5663
5664         return 0;
5665 }
5666
5667 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5668 {
5669         struct drbd_peer_device *peer_device;
5670         struct drbd_device *device;
5671         struct p_block_ack *p = pi->data;
5672         struct drbd_device_work *dw;
5673         sector_t sector;
5674         int size;
5675
5676         peer_device = conn_peer_device(connection, pi->vnr);
5677         if (!peer_device)
5678                 return -EIO;
5679         device = peer_device->device;
5680
5681         sector = be64_to_cpu(p->sector);
5682         size = be32_to_cpu(p->blksize);
5683
5684         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5685
5686         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5687                 drbd_ov_out_of_sync_found(device, sector, size);
5688         else
5689                 ov_out_of_sync_print(device);
5690
5691         if (!get_ldev(device))
5692                 return 0;
5693
5694         drbd_rs_complete_io(device, sector);
5695         dec_rs_pending(device);
5696
5697         --device->ov_left;
5698
5699         /* let's advance progress step marks only for every other megabyte */
5700         if ((device->ov_left & 0x200) == 0x200)
5701                 drbd_advance_rs_marks(device, device->ov_left);
5702
5703         if (device->ov_left == 0) {
5704                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5705                 if (dw) {
5706                         dw->w.cb = w_ov_finished;
5707                         dw->device = device;
5708                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5709                 } else {
5710                         drbd_err(device, "kmalloc(dw) failed.");
5711                         ov_out_of_sync_print(device);
5712                         drbd_resync_finished(device);
5713                 }
5714         }
5715         put_ldev(device);
5716         return 0;
5717 }
5718
5719 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5720 {
5721         return 0;
5722 }
5723
5724 struct meta_sock_cmd {
5725         size_t pkt_size;
5726         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5727 };
5728
5729 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5730 {
5731         long t;
5732         struct net_conf *nc;
5733
5734         rcu_read_lock();
5735         nc = rcu_dereference(connection->net_conf);
5736         t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5737         rcu_read_unlock();
5738
5739         t *= HZ;
5740         if (ping_timeout)
5741                 t /= 10;
5742
5743         connection->meta.socket->sk->sk_rcvtimeo = t;
5744 }
5745
5746 static void set_ping_timeout(struct drbd_connection *connection)
5747 {
5748         set_rcvtimeo(connection, 1);
5749 }
5750
5751 static void set_idle_timeout(struct drbd_connection *connection)
5752 {
5753         set_rcvtimeo(connection, 0);
5754 }
5755
5756 static struct meta_sock_cmd ack_receiver_tbl[] = {
5757         [P_PING]            = { 0, got_Ping },
5758         [P_PING_ACK]        = { 0, got_PingAck },
5759         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5760         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5761         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5762         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5763         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5764         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5765         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5766         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5767         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5768         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5769         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5770         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5771         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5772         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5773         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5774 };
5775
5776 int drbd_ack_receiver(struct drbd_thread *thi)
5777 {
5778         struct drbd_connection *connection = thi->connection;
5779         struct meta_sock_cmd *cmd = NULL;
5780         struct packet_info pi;
5781         unsigned long pre_recv_jif;
5782         int rv;
5783         void *buf    = connection->meta.rbuf;
5784         int received = 0;
5785         unsigned int header_size = drbd_header_size(connection);
5786         int expect   = header_size;
5787         bool ping_timeout_active = false;
5788         struct sched_param param = { .sched_priority = 2 };
5789
5790         rv = sched_setscheduler(current, SCHED_RR, &param);
5791         if (rv < 0)
5792                 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5793
5794         while (get_t_state(thi) == RUNNING) {
5795                 drbd_thread_current_set_cpu(thi);
5796
5797                 conn_reclaim_net_peer_reqs(connection);
5798
5799                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5800                         if (drbd_send_ping(connection)) {
5801                                 drbd_err(connection, "drbd_send_ping has failed\n");
5802                                 goto reconnect;
5803                         }
5804                         set_ping_timeout(connection);
5805                         ping_timeout_active = true;
5806                 }
5807
5808                 pre_recv_jif = jiffies;
5809                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5810
5811                 /* Note:
5812                  * -EINTR        (on meta) we got a signal
5813                  * -EAGAIN       (on meta) rcvtimeo expired
5814                  * -ECONNRESET   other side closed the connection
5815                  * -ERESTARTSYS  (on data) we got a signal
5816                  * rv <  0       other than above: unexpected error!
5817                  * rv == expected: full header or command
5818                  * rv <  expected: "woken" by signal during receive
5819                  * rv == 0       : "connection shut down by peer"
5820                  */
5821                 if (likely(rv > 0)) {
5822                         received += rv;
5823                         buf      += rv;
5824                 } else if (rv == 0) {
5825                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5826                                 long t;
5827                                 rcu_read_lock();
5828                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5829                                 rcu_read_unlock();
5830
5831                                 t = wait_event_timeout(connection->ping_wait,
5832                                                        connection->cstate < C_WF_REPORT_PARAMS,
5833                                                        t);
5834                                 if (t)
5835                                         break;
5836                         }
5837                         drbd_err(connection, "meta connection shut down by peer.\n");
5838                         goto reconnect;
5839                 } else if (rv == -EAGAIN) {
5840                         /* If the data socket received something meanwhile,
5841                          * that is good enough: peer is still alive. */
5842                         if (time_after(connection->last_received, pre_recv_jif))
5843                                 continue;
5844                         if (ping_timeout_active) {
5845                                 drbd_err(connection, "PingAck did not arrive in time.\n");
5846                                 goto reconnect;
5847                         }
5848                         set_bit(SEND_PING, &connection->flags);
5849                         continue;
5850                 } else if (rv == -EINTR) {
5851                         /* maybe drbd_thread_stop(): the while condition will notice.
5852                          * maybe woken for send_ping: we'll send a ping above,
5853                          * and change the rcvtimeo */
5854                         flush_signals(current);
5855                         continue;
5856                 } else {
5857                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5858                         goto reconnect;
5859                 }
5860
5861                 if (received == expect && cmd == NULL) {
5862                         if (decode_header(connection, connection->meta.rbuf, &pi))
5863                                 goto reconnect;
5864                         cmd = &ack_receiver_tbl[pi.cmd];
5865                         if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5866                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5867                                          cmdname(pi.cmd), pi.cmd);
5868                                 goto disconnect;
5869                         }
5870                         expect = header_size + cmd->pkt_size;
5871                         if (pi.size != expect - header_size) {
5872                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5873                                         pi.cmd, pi.size);
5874                                 goto reconnect;
5875                         }
5876                 }
5877                 if (received == expect) {
5878                         bool err;
5879
5880                         err = cmd->fn(connection, &pi);
5881                         if (err) {
5882                                 drbd_err(connection, "%pf failed\n", cmd->fn);
5883                                 goto reconnect;
5884                         }
5885
5886                         connection->last_received = jiffies;
5887
5888                         if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5889                                 set_idle_timeout(connection);
5890                                 ping_timeout_active = false;
5891                         }
5892
5893                         buf      = connection->meta.rbuf;
5894                         received = 0;
5895                         expect   = header_size;
5896                         cmd      = NULL;
5897                 }
5898         }
5899
5900         if (0) {
5901 reconnect:
5902                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5903                 conn_md_sync(connection);
5904         }
5905         if (0) {
5906 disconnect:
5907                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5908         }
5909
5910         drbd_info(connection, "ack_receiver terminated\n");
5911
5912         return 0;
5913 }
5914
5915 void drbd_send_acks_wf(struct work_struct *ws)
5916 {
5917         struct drbd_peer_device *peer_device =
5918                 container_of(ws, struct drbd_peer_device, send_acks_work);
5919         struct drbd_connection *connection = peer_device->connection;
5920         struct drbd_device *device = peer_device->device;
5921         struct net_conf *nc;
5922         int tcp_cork, err;
5923
5924         rcu_read_lock();
5925         nc = rcu_dereference(connection->net_conf);
5926         tcp_cork = nc->tcp_cork;
5927         rcu_read_unlock();
5928
5929         if (tcp_cork)
5930                 drbd_tcp_cork(connection->meta.socket);
5931
5932         err = drbd_finish_peer_reqs(device);
5933         kref_put(&device->kref, drbd_destroy_device);
5934         /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5935            struct work_struct send_acks_work alive, which is in the peer_device object */
5936
5937         if (err) {
5938                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5939                 return;
5940         }
5941
5942         if (tcp_cork)
5943                 drbd_tcp_uncork(connection->meta.socket);
5944
5945         return;
5946 }