4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <asm/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
63 static int drbd_do_features(struct drbd_tconn *tconn);
64 static int drbd_do_auth(struct drbd_tconn *tconn);
65 static int drbd_disconnected(int vnr, void *p, void *data);
67 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68 static int e_end_block(struct drbd_work *, int);
71 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
74 * some helper functions to deal with single linked page lists,
75 * page->private being our "next" pointer.
78 /* If at least n pages are linked at head, get n pages off.
79 * Otherwise, don't modify head, and return NULL.
80 * Locking is the responsibility of the caller.
82 static struct page *page_chain_del(struct page **head, int n)
96 tmp = page_chain_next(page);
98 break; /* found sufficient pages */
100 /* insufficient pages, don't use any of them. */
105 /* add end of list marker for the returned list */
106 set_page_private(page, 0);
107 /* actual return value, and adjustment of head */
113 /* may be used outside of locks to find the tail of a (usually short)
114 * "private" page chain, before adding it back to a global chain head
115 * with page_chain_add() under a spinlock. */
116 static struct page *page_chain_tail(struct page *page, int *len)
120 while ((tmp = page_chain_next(page)))
127 static int page_chain_free(struct page *page)
131 page_chain_for_each_safe(page, tmp) {
138 static void page_chain_add(struct page **head,
139 struct page *chain_first, struct page *chain_last)
143 tmp = page_chain_tail(chain_first, NULL);
144 BUG_ON(tmp != chain_last);
147 /* add chain to head */
148 set_page_private(chain_last, (unsigned long)*head);
152 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
154 struct page *page = NULL;
155 struct page *tmp = NULL;
158 /* Yes, testing drbd_pp_vacant outside the lock is racy.
159 * So what. It saves a spin_lock. */
160 if (drbd_pp_vacant >= number) {
161 spin_lock(&drbd_pp_lock);
162 page = page_chain_del(&drbd_pp_pool, number);
164 drbd_pp_vacant -= number;
165 spin_unlock(&drbd_pp_lock);
170 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
171 * "criss-cross" setup, that might cause write-out on some other DRBD,
172 * which in turn might block on the other node at this very place. */
173 for (i = 0; i < number; i++) {
174 tmp = alloc_page(GFP_TRY);
177 set_page_private(tmp, (unsigned long)page);
184 /* Not enough pages immediately available this time.
185 * No need to jump around here, drbd_pp_alloc will retry this
186 * function "soon". */
188 tmp = page_chain_tail(page, NULL);
189 spin_lock(&drbd_pp_lock);
190 page_chain_add(&drbd_pp_pool, page, tmp);
192 spin_unlock(&drbd_pp_lock);
197 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
199 struct drbd_peer_request *peer_req;
200 struct list_head *le, *tle;
202 /* The EEs are always appended to the end of the list. Since
203 they are sent in order over the wire, they have to finish
204 in order. As soon as we see the first not finished we can
205 stop to examine the list... */
207 list_for_each_safe(le, tle, &mdev->net_ee) {
208 peer_req = list_entry(le, struct drbd_peer_request, w.list);
209 if (drbd_ee_has_active_page(peer_req))
211 list_move(le, to_be_freed);
215 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
217 LIST_HEAD(reclaimed);
218 struct drbd_peer_request *peer_req, *t;
220 spin_lock_irq(&mdev->tconn->req_lock);
221 reclaim_net_ee(mdev, &reclaimed);
222 spin_unlock_irq(&mdev->tconn->req_lock);
224 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
225 drbd_free_net_ee(mdev, peer_req);
229 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
230 * @mdev: DRBD device.
231 * @number: number of pages requested
232 * @retry: whether to retry, if not enough pages are available right now
234 * Tries to allocate number pages, first from our own page pool, then from
235 * the kernel, unless this allocation would exceed the max_buffers setting.
236 * Possibly retry until DRBD frees sufficient pages somewhere else.
238 * Returns a page chain linked via page->private.
240 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
242 struct page *page = NULL;
245 /* Yes, we may run up to @number over max_buffers. If we
246 * follow it strictly, the admin will get it wrong anyways. */
247 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
248 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250 while (page == NULL) {
251 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
253 drbd_kick_lo_and_reclaim_net(mdev);
255 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
256 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
264 if (signal_pending(current)) {
265 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
271 finish_wait(&drbd_pp_wait, &wait);
274 atomic_add(number, &mdev->pp_in_use);
278 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
279 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
280 * Either links the page chain back to the global pool,
281 * or returns all pages to the system. */
282 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
284 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
287 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
288 i = page_chain_free(page);
291 tmp = page_chain_tail(page, &i);
292 spin_lock(&drbd_pp_lock);
293 page_chain_add(&drbd_pp_pool, page, tmp);
295 spin_unlock(&drbd_pp_lock);
297 i = atomic_sub_return(i, a);
299 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
300 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
301 wake_up(&drbd_pp_wait);
305 You need to hold the req_lock:
306 _drbd_wait_ee_list_empty()
308 You must not have the req_lock:
314 drbd_process_done_ee()
316 drbd_wait_ee_list_empty()
319 struct drbd_peer_request *
320 drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
321 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
323 struct drbd_peer_request *peer_req;
325 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
327 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
330 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
332 if (!(gfp_mask & __GFP_NOWARN))
333 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
337 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
341 drbd_clear_interval(&peer_req->i);
342 peer_req->i.size = data_size;
343 peer_req->i.sector = sector;
344 peer_req->i.local = false;
345 peer_req->i.waiting = false;
347 peer_req->epoch = NULL;
348 peer_req->w.mdev = mdev;
349 peer_req->pages = page;
350 atomic_set(&peer_req->pending_bios, 0);
353 * The block_id is opaque to the receiver. It is not endianness
354 * converted, and sent back to the sender unchanged.
356 peer_req->block_id = id;
361 mempool_free(peer_req, drbd_ee_mempool);
365 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
368 if (peer_req->flags & EE_HAS_DIGEST)
369 kfree(peer_req->digest);
370 drbd_pp_free(mdev, peer_req->pages, is_net);
371 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
372 D_ASSERT(drbd_interval_empty(&peer_req->i));
373 mempool_free(peer_req, drbd_ee_mempool);
376 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
378 LIST_HEAD(work_list);
379 struct drbd_peer_request *peer_req, *t;
381 int is_net = list == &mdev->net_ee;
383 spin_lock_irq(&mdev->tconn->req_lock);
384 list_splice_init(list, &work_list);
385 spin_unlock_irq(&mdev->tconn->req_lock);
387 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
388 drbd_free_some_ee(mdev, peer_req, is_net);
395 /* See also comments in _req_mod(,BARRIER_ACKED)
396 * and receive_Barrier.
398 * Move entries from net_ee to done_ee, if ready.
399 * Grab done_ee, call all callbacks, free the entries.
400 * The callbacks typically send out ACKs.
402 static int drbd_process_done_ee(struct drbd_conf *mdev)
404 LIST_HEAD(work_list);
405 LIST_HEAD(reclaimed);
406 struct drbd_peer_request *peer_req, *t;
409 spin_lock_irq(&mdev->tconn->req_lock);
410 reclaim_net_ee(mdev, &reclaimed);
411 list_splice_init(&mdev->done_ee, &work_list);
412 spin_unlock_irq(&mdev->tconn->req_lock);
414 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
415 drbd_free_net_ee(mdev, peer_req);
417 /* possible callbacks here:
418 * e_end_block, and e_end_resync_block, e_send_discard_write.
419 * all ignore the last argument.
421 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
424 /* list_del not necessary, next/prev members not touched */
425 err2 = peer_req->w.cb(&peer_req->w, !!err);
428 drbd_free_ee(mdev, peer_req);
430 wake_up(&mdev->ee_wait);
435 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
439 /* avoids spin_lock/unlock
440 * and calling prepare_to_wait in the fast path */
441 while (!list_empty(head)) {
442 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
443 spin_unlock_irq(&mdev->tconn->req_lock);
445 finish_wait(&mdev->ee_wait, &wait);
446 spin_lock_irq(&mdev->tconn->req_lock);
450 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
452 spin_lock_irq(&mdev->tconn->req_lock);
453 _drbd_wait_ee_list_empty(mdev, head);
454 spin_unlock_irq(&mdev->tconn->req_lock);
457 /* see also kernel_accept; which is only present since 2.6.18.
458 * also we want to log which part of it failed, exactly */
459 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
461 struct sock *sk = sock->sk;
465 err = sock->ops->listen(sock, 5);
469 *what = "sock_create_lite";
470 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
476 err = sock->ops->accept(sock, *newsock, 0);
478 sock_release(*newsock);
482 (*newsock)->ops = sock->ops;
488 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
495 struct msghdr msg = {
497 .msg_iov = (struct iovec *)&iov,
498 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
504 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
510 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
517 struct msghdr msg = {
519 .msg_iov = (struct iovec *)&iov,
520 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
533 * ECONNRESET other side closed the connection
534 * ERESTARTSYS (on sock) we got a signal
538 if (rv == -ECONNRESET)
539 conn_info(tconn, "sock was reset by peer\n");
540 else if (rv != -ERESTARTSYS)
541 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
543 } else if (rv == 0) {
544 conn_info(tconn, "sock was shut down by peer\n");
547 /* signal came in, or peer/link went down,
548 * after we read a partial message
550 /* D_ASSERT(signal_pending(current)); */
558 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
563 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
567 err = drbd_recv(tconn, buf, size);
576 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
580 err = drbd_recv_all(tconn, buf, size);
581 if (err && !signal_pending(current))
582 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
587 * On individual connections, the socket buffer size must be set prior to the
588 * listen(2) or connect(2) calls in order to have it take effect.
589 * This is our wrapper to do so.
591 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
594 /* open coded SO_SNDBUF, SO_RCVBUF */
596 sock->sk->sk_sndbuf = snd;
597 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
600 sock->sk->sk_rcvbuf = rcv;
601 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
605 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
609 struct sockaddr_in6 src_in6;
611 int disconnect_on_error = 1;
613 if (!get_net_conf(tconn))
616 what = "sock_create_kern";
617 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
618 SOCK_STREAM, IPPROTO_TCP, &sock);
624 sock->sk->sk_rcvtimeo =
625 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
626 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
627 tconn->net_conf->rcvbuf_size);
629 /* explicitly bind to the configured IP as source IP
630 * for the outgoing connections.
631 * This is needed for multihomed hosts and to be
632 * able to use lo: interfaces for drbd.
633 * Make sure to use 0 as port number, so linux selects
634 * a free one dynamically.
636 memcpy(&src_in6, tconn->net_conf->my_addr,
637 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
638 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
639 src_in6.sin6_port = 0;
641 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
643 what = "bind before connect";
644 err = sock->ops->bind(sock,
645 (struct sockaddr *) &src_in6,
646 tconn->net_conf->my_addr_len);
650 /* connect may fail, peer not yet available.
651 * stay C_WF_CONNECTION, don't go Disconnecting! */
652 disconnect_on_error = 0;
654 err = sock->ops->connect(sock,
655 (struct sockaddr *)tconn->net_conf->peer_addr,
656 tconn->net_conf->peer_addr_len, 0);
665 /* timeout, busy, signal pending */
666 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
667 case EINTR: case ERESTARTSYS:
668 /* peer not (yet) available, network problem */
669 case ECONNREFUSED: case ENETUNREACH:
670 case EHOSTDOWN: case EHOSTUNREACH:
671 disconnect_on_error = 0;
674 conn_err(tconn, "%s failed, err = %d\n", what, err);
676 if (disconnect_on_error)
677 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
683 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
686 struct socket *s_estab = NULL, *s_listen;
689 if (!get_net_conf(tconn))
692 what = "sock_create_kern";
693 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
694 SOCK_STREAM, IPPROTO_TCP, &s_listen);
700 timeo = tconn->net_conf->try_connect_int * HZ;
701 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
703 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
704 s_listen->sk->sk_rcvtimeo = timeo;
705 s_listen->sk->sk_sndtimeo = timeo;
706 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
707 tconn->net_conf->rcvbuf_size);
709 what = "bind before listen";
710 err = s_listen->ops->bind(s_listen,
711 (struct sockaddr *) tconn->net_conf->my_addr,
712 tconn->net_conf->my_addr_len);
716 err = drbd_accept(&what, s_listen, &s_estab);
720 sock_release(s_listen);
722 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
723 conn_err(tconn, "%s failed, err = %d\n", what, err);
724 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
732 static int drbd_send_fp(struct drbd_tconn *tconn, struct drbd_socket *sock, enum drbd_packet cmd)
734 struct p_header *h = tconn->data.sbuf;
736 return !_conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
739 static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
744 rr = drbd_recv_short(sock, &h, sizeof(h), 0);
746 if (rr == sizeof(h) && h.magic == cpu_to_be32(DRBD_MAGIC))
747 return be16_to_cpu(h.command);
753 * drbd_socket_okay() - Free the socket if its connection is not okay
754 * @sock: pointer to the pointer to the socket.
756 static int drbd_socket_okay(struct socket **sock)
764 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
766 if (rr > 0 || rr == -EAGAIN) {
774 /* Gets called if a connection is established, or if a new minor gets created
776 int drbd_connected(int vnr, void *p, void *data)
778 struct drbd_conf *mdev = (struct drbd_conf *)p;
781 atomic_set(&mdev->packet_seq, 0);
784 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
785 &mdev->tconn->cstate_mutex :
786 &mdev->own_state_mutex;
788 err = drbd_send_sync_param(mdev);
790 err = drbd_send_sizes(mdev, 0, 0);
792 err = drbd_send_uuids(mdev);
794 err = drbd_send_state(mdev);
795 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
796 clear_bit(RESIZE_PENDING, &mdev->flags);
802 * 1 yes, we have a valid connection
803 * 0 oops, did not work out, please try again
804 * -1 peer talks different language,
805 * no point in trying again, please go standalone.
806 * -2 We do not have a network config...
808 static int drbd_connect(struct drbd_tconn *tconn)
810 struct socket *sock, *msock;
813 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
816 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
818 /* Assume that the peer only understands protocol 80 until we know better. */
819 tconn->agreed_pro_version = 80;
825 /* 3 tries, this should take less than a second! */
826 s = drbd_try_connect(tconn);
829 /* give the other side time to call bind() & listen() */
830 schedule_timeout_interruptible(HZ / 10);
834 if (!tconn->data.socket) {
835 tconn->data.socket = s;
836 drbd_send_fp(tconn, &tconn->data, P_INITIAL_DATA);
837 } else if (!tconn->meta.socket) {
838 tconn->meta.socket = s;
839 drbd_send_fp(tconn, &tconn->meta, P_INITIAL_META);
841 conn_err(tconn, "Logic error in drbd_connect()\n");
842 goto out_release_sockets;
846 if (tconn->data.socket && tconn->meta.socket) {
847 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
848 ok = drbd_socket_okay(&tconn->data.socket);
849 ok = drbd_socket_okay(&tconn->meta.socket) && ok;
855 s = drbd_wait_for_connect(tconn);
857 try = drbd_recv_fp(tconn, s);
858 drbd_socket_okay(&tconn->data.socket);
859 drbd_socket_okay(&tconn->meta.socket);
862 if (tconn->data.socket) {
863 conn_warn(tconn, "initial packet S crossed\n");
864 sock_release(tconn->data.socket);
866 tconn->data.socket = s;
869 if (tconn->meta.socket) {
870 conn_warn(tconn, "initial packet M crossed\n");
871 sock_release(tconn->meta.socket);
873 tconn->meta.socket = s;
874 set_bit(DISCARD_CONCURRENT, &tconn->flags);
877 conn_warn(tconn, "Error receiving initial packet\n");
884 if (tconn->cstate <= C_DISCONNECTING)
885 goto out_release_sockets;
886 if (signal_pending(current)) {
887 flush_signals(current);
889 if (get_t_state(&tconn->receiver) == EXITING)
890 goto out_release_sockets;
893 if (tconn->data.socket && &tconn->meta.socket) {
894 ok = drbd_socket_okay(&tconn->data.socket);
895 ok = drbd_socket_okay(&tconn->meta.socket) && ok;
901 sock = tconn->data.socket;
902 msock = tconn->meta.socket;
904 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
905 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
907 sock->sk->sk_allocation = GFP_NOIO;
908 msock->sk->sk_allocation = GFP_NOIO;
910 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
911 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
914 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
915 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
916 * first set it to the P_CONNECTION_FEATURES timeout,
917 * which we set to 4x the configured ping_timeout. */
918 sock->sk->sk_sndtimeo =
919 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
921 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
922 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
924 /* we don't want delays.
925 * we use TCP_CORK where appropriate, though */
926 drbd_tcp_nodelay(sock);
927 drbd_tcp_nodelay(msock);
929 tconn->last_received = jiffies;
931 h = drbd_do_features(tconn);
935 if (tconn->cram_hmac_tfm) {
936 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
937 switch (drbd_do_auth(tconn)) {
939 conn_err(tconn, "Authentication of peer failed\n");
942 conn_err(tconn, "Authentication of peer failed, trying again.\n");
947 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
950 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
951 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
953 drbd_thread_start(&tconn->asender);
955 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
958 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
961 if (tconn->data.socket) {
962 sock_release(tconn->data.socket);
963 tconn->data.socket = NULL;
965 if (tconn->meta.socket) {
966 sock_release(tconn->meta.socket);
967 tconn->meta.socket = NULL;
972 static int decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
974 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
975 pi->cmd = be16_to_cpu(h->h80.command);
976 pi->size = be16_to_cpu(h->h80.length);
978 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
979 pi->cmd = be16_to_cpu(h->h95.command);
980 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
983 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
984 be32_to_cpu(h->h80.magic),
985 be16_to_cpu(h->h80.command),
986 be16_to_cpu(h->h80.length));
992 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
994 struct p_header *h = tconn->data.rbuf;
997 err = drbd_recv_all_warn(tconn, h, sizeof(*h));
1001 err = decode_header(tconn, h, pi);
1002 tconn->last_received = jiffies;
1007 static void drbd_flush(struct drbd_conf *mdev)
1011 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1012 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1015 dev_err(DEV, "local disk flush failed with status %d\n", rv);
1016 /* would rather check on EOPNOTSUPP, but that is not reliable.
1017 * don't try again for ANY return value != 0
1018 * if (rv == -EOPNOTSUPP) */
1019 drbd_bump_write_ordering(mdev, WO_drain_io);
1026 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1027 * @mdev: DRBD device.
1028 * @epoch: Epoch object.
1031 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1032 struct drbd_epoch *epoch,
1033 enum epoch_event ev)
1036 struct drbd_epoch *next_epoch;
1037 enum finish_epoch rv = FE_STILL_LIVE;
1039 spin_lock(&mdev->epoch_lock);
1043 epoch_size = atomic_read(&epoch->epoch_size);
1045 switch (ev & ~EV_CLEANUP) {
1047 atomic_dec(&epoch->active);
1049 case EV_GOT_BARRIER_NR:
1050 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1052 case EV_BECAME_LAST:
1057 if (epoch_size != 0 &&
1058 atomic_read(&epoch->active) == 0 &&
1059 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1060 if (!(ev & EV_CLEANUP)) {
1061 spin_unlock(&mdev->epoch_lock);
1062 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1063 spin_lock(&mdev->epoch_lock);
1067 if (mdev->current_epoch != epoch) {
1068 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1069 list_del(&epoch->list);
1070 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1074 if (rv == FE_STILL_LIVE)
1078 atomic_set(&epoch->epoch_size, 0);
1079 /* atomic_set(&epoch->active, 0); is already zero */
1080 if (rv == FE_STILL_LIVE)
1082 wake_up(&mdev->ee_wait);
1092 spin_unlock(&mdev->epoch_lock);
1098 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1099 * @mdev: DRBD device.
1100 * @wo: Write ordering method to try.
1102 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1104 enum write_ordering_e pwo;
1105 static char *write_ordering_str[] = {
1107 [WO_drain_io] = "drain",
1108 [WO_bdev_flush] = "flush",
1111 pwo = mdev->write_ordering;
1113 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1115 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1117 mdev->write_ordering = wo;
1118 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1119 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1123 * drbd_submit_peer_request()
1124 * @mdev: DRBD device.
1125 * @peer_req: peer request
1126 * @rw: flag field, see bio->bi_rw
1128 * May spread the pages to multiple bios,
1129 * depending on bio_add_page restrictions.
1131 * Returns 0 if all bios have been submitted,
1132 * -ENOMEM if we could not allocate enough bios,
1133 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1134 * single page to an empty bio (which should never happen and likely indicates
1135 * that the lower level IO stack is in some way broken). This has been observed
1136 * on certain Xen deployments.
1138 /* TODO allocate from our own bio_set. */
1139 int drbd_submit_peer_request(struct drbd_conf *mdev,
1140 struct drbd_peer_request *peer_req,
1141 const unsigned rw, const int fault_type)
1143 struct bio *bios = NULL;
1145 struct page *page = peer_req->pages;
1146 sector_t sector = peer_req->i.sector;
1147 unsigned ds = peer_req->i.size;
1148 unsigned n_bios = 0;
1149 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1152 /* In most cases, we will only need one bio. But in case the lower
1153 * level restrictions happen to be different at this offset on this
1154 * side than those of the sending peer, we may need to submit the
1155 * request in more than one bio.
1157 * Plain bio_alloc is good enough here, this is no DRBD internally
1158 * generated bio, but a bio allocated on behalf of the peer.
1161 bio = bio_alloc(GFP_NOIO, nr_pages);
1163 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1166 /* > peer_req->i.sector, unless this is the first bio */
1167 bio->bi_sector = sector;
1168 bio->bi_bdev = mdev->ldev->backing_bdev;
1170 bio->bi_private = peer_req;
1171 bio->bi_end_io = drbd_peer_request_endio;
1173 bio->bi_next = bios;
1177 page_chain_for_each(page) {
1178 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1179 if (!bio_add_page(bio, page, len, 0)) {
1180 /* A single page must always be possible!
1181 * But in case it fails anyways,
1182 * we deal with it, and complain (below). */
1183 if (bio->bi_vcnt == 0) {
1185 "bio_add_page failed for len=%u, "
1186 "bi_vcnt=0 (bi_sector=%llu)\n",
1187 len, (unsigned long long)bio->bi_sector);
1197 D_ASSERT(page == NULL);
1200 atomic_set(&peer_req->pending_bios, n_bios);
1203 bios = bios->bi_next;
1204 bio->bi_next = NULL;
1206 drbd_generic_make_request(mdev, fault_type, bio);
1213 bios = bios->bi_next;
1219 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1220 struct drbd_peer_request *peer_req)
1222 struct drbd_interval *i = &peer_req->i;
1224 drbd_remove_interval(&mdev->write_requests, i);
1225 drbd_clear_interval(i);
1227 /* Wake up any processes waiting for this peer request to complete. */
1229 wake_up(&mdev->misc_wait);
1232 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1234 struct drbd_conf *mdev;
1236 struct p_barrier *p = tconn->data.rbuf;
1237 struct drbd_epoch *epoch;
1239 mdev = vnr_to_mdev(tconn, pi->vnr);
1245 mdev->current_epoch->barrier_nr = p->barrier;
1246 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1248 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1249 * the activity log, which means it would not be resynced in case the
1250 * R_PRIMARY crashes now.
1251 * Therefore we must send the barrier_ack after the barrier request was
1253 switch (mdev->write_ordering) {
1255 if (rv == FE_RECYCLED)
1258 /* receiver context, in the writeout path of the other node.
1259 * avoid potential distributed deadlock */
1260 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1264 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1269 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1272 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1273 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1278 epoch = mdev->current_epoch;
1279 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1281 D_ASSERT(atomic_read(&epoch->active) == 0);
1282 D_ASSERT(epoch->flags == 0);
1286 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1291 atomic_set(&epoch->epoch_size, 0);
1292 atomic_set(&epoch->active, 0);
1294 spin_lock(&mdev->epoch_lock);
1295 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1296 list_add(&epoch->list, &mdev->current_epoch->list);
1297 mdev->current_epoch = epoch;
1300 /* The current_epoch got recycled while we allocated this one... */
1303 spin_unlock(&mdev->epoch_lock);
1308 /* used from receive_RSDataReply (recv_resync_read)
1309 * and from receive_Data */
1310 static struct drbd_peer_request *
1311 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1312 int data_size) __must_hold(local)
1314 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1315 struct drbd_peer_request *peer_req;
1318 void *dig_in = mdev->tconn->int_dig_in;
1319 void *dig_vv = mdev->tconn->int_dig_vv;
1320 unsigned long *data;
1322 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1323 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1326 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1333 if (!expect(data_size != 0))
1335 if (!expect(IS_ALIGNED(data_size, 512)))
1337 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1340 /* even though we trust out peer,
1341 * we sometimes have to double check. */
1342 if (sector + (data_size>>9) > capacity) {
1343 dev_err(DEV, "request from peer beyond end of local disk: "
1344 "capacity: %llus < sector: %llus + size: %u\n",
1345 (unsigned long long)capacity,
1346 (unsigned long long)sector, data_size);
1350 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1351 * "criss-cross" setup, that might cause write-out on some other DRBD,
1352 * which in turn might block on the other node at this very place. */
1353 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1358 page = peer_req->pages;
1359 page_chain_for_each(page) {
1360 unsigned len = min_t(int, ds, PAGE_SIZE);
1362 err = drbd_recv_all_warn(mdev->tconn, data, len);
1363 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1364 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1365 data[0] = data[0] ^ (unsigned long)-1;
1369 drbd_free_ee(mdev, peer_req);
1376 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1377 if (memcmp(dig_in, dig_vv, dgs)) {
1378 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1379 (unsigned long long)sector, data_size);
1380 drbd_free_ee(mdev, peer_req);
1384 mdev->recv_cnt += data_size>>9;
1388 /* drbd_drain_block() just takes a data block
1389 * out of the socket input buffer, and discards it.
1391 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1400 page = drbd_pp_alloc(mdev, 1, 1);
1404 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1406 err = drbd_recv_all_warn(mdev->tconn, data, len);
1412 drbd_pp_free(mdev, page, 0);
1416 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1417 sector_t sector, int data_size)
1419 struct bio_vec *bvec;
1421 int dgs, err, i, expect;
1422 void *dig_in = mdev->tconn->int_dig_in;
1423 void *dig_vv = mdev->tconn->int_dig_vv;
1425 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1426 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1429 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1436 /* optimistically update recv_cnt. if receiving fails below,
1437 * we disconnect anyways, and counters will be reset. */
1438 mdev->recv_cnt += data_size>>9;
1440 bio = req->master_bio;
1441 D_ASSERT(sector == bio->bi_sector);
1443 bio_for_each_segment(bvec, bio, i) {
1444 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1445 expect = min_t(int, data_size, bvec->bv_len);
1446 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1447 kunmap(bvec->bv_page);
1450 data_size -= expect;
1454 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1455 if (memcmp(dig_in, dig_vv, dgs)) {
1456 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1461 D_ASSERT(data_size == 0);
1465 /* e_end_resync_block() is called via
1466 * drbd_process_done_ee() by asender only */
1467 static int e_end_resync_block(struct drbd_work *w, int unused)
1469 struct drbd_peer_request *peer_req =
1470 container_of(w, struct drbd_peer_request, w);
1471 struct drbd_conf *mdev = w->mdev;
1472 sector_t sector = peer_req->i.sector;
1475 D_ASSERT(drbd_interval_empty(&peer_req->i));
1477 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1478 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1479 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1481 /* Record failure to sync */
1482 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1484 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1491 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1493 struct drbd_peer_request *peer_req;
1495 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1499 dec_rs_pending(mdev);
1502 /* corresponding dec_unacked() in e_end_resync_block()
1503 * respective _drbd_clear_done_ee */
1505 peer_req->w.cb = e_end_resync_block;
1507 spin_lock_irq(&mdev->tconn->req_lock);
1508 list_add(&peer_req->w.list, &mdev->sync_ee);
1509 spin_unlock_irq(&mdev->tconn->req_lock);
1511 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1512 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1515 /* don't care for the reason here */
1516 dev_err(DEV, "submit failed, triggering re-connect\n");
1517 spin_lock_irq(&mdev->tconn->req_lock);
1518 list_del(&peer_req->w.list);
1519 spin_unlock_irq(&mdev->tconn->req_lock);
1521 drbd_free_ee(mdev, peer_req);
1527 static struct drbd_request *
1528 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1529 sector_t sector, bool missing_ok, const char *func)
1531 struct drbd_request *req;
1533 /* Request object according to our peer */
1534 req = (struct drbd_request *)(unsigned long)id;
1535 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1538 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1539 (unsigned long)id, (unsigned long long)sector);
1544 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1546 struct drbd_conf *mdev;
1547 struct drbd_request *req;
1550 struct p_data *p = tconn->data.rbuf;
1552 mdev = vnr_to_mdev(tconn, pi->vnr);
1556 sector = be64_to_cpu(p->sector);
1558 spin_lock_irq(&mdev->tconn->req_lock);
1559 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1560 spin_unlock_irq(&mdev->tconn->req_lock);
1564 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1565 * special casing it there for the various failure cases.
1566 * still no race with drbd_fail_pending_reads */
1567 err = recv_dless_read(mdev, req, sector, pi->size);
1569 req_mod(req, DATA_RECEIVED);
1570 /* else: nothing. handled from drbd_disconnect...
1571 * I don't think we may complete this just yet
1572 * in case we are "on-disconnect: freeze" */
1577 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1579 struct drbd_conf *mdev;
1582 struct p_data *p = tconn->data.rbuf;
1584 mdev = vnr_to_mdev(tconn, pi->vnr);
1588 sector = be64_to_cpu(p->sector);
1589 D_ASSERT(p->block_id == ID_SYNCER);
1591 if (get_ldev(mdev)) {
1592 /* data is submitted to disk within recv_resync_read.
1593 * corresponding put_ldev done below on error,
1594 * or in drbd_peer_request_endio. */
1595 err = recv_resync_read(mdev, sector, pi->size);
1597 if (__ratelimit(&drbd_ratelimit_state))
1598 dev_err(DEV, "Can not write resync data to local disk.\n");
1600 err = drbd_drain_block(mdev, pi->size);
1602 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1605 atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1610 static int w_restart_write(struct drbd_work *w, int cancel)
1612 struct drbd_request *req = container_of(w, struct drbd_request, w);
1613 struct drbd_conf *mdev = w->mdev;
1615 unsigned long start_time;
1616 unsigned long flags;
1618 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1619 if (!expect(req->rq_state & RQ_POSTPONED)) {
1620 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1623 bio = req->master_bio;
1624 start_time = req->start_time;
1625 /* Postponed requests will not have their master_bio completed! */
1626 __req_mod(req, DISCARD_WRITE, NULL);
1627 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1629 while (__drbd_make_request(mdev, bio, start_time))
1634 static void restart_conflicting_writes(struct drbd_conf *mdev,
1635 sector_t sector, int size)
1637 struct drbd_interval *i;
1638 struct drbd_request *req;
1640 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1643 req = container_of(i, struct drbd_request, i);
1644 if (req->rq_state & RQ_LOCAL_PENDING ||
1645 !(req->rq_state & RQ_POSTPONED))
1647 if (expect(list_empty(&req->w.list))) {
1649 req->w.cb = w_restart_write;
1650 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1655 /* e_end_block() is called via drbd_process_done_ee().
1656 * this means this function only runs in the asender thread
1658 static int e_end_block(struct drbd_work *w, int cancel)
1660 struct drbd_peer_request *peer_req =
1661 container_of(w, struct drbd_peer_request, w);
1662 struct drbd_conf *mdev = w->mdev;
1663 sector_t sector = peer_req->i.sector;
1666 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
1667 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1668 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1669 mdev->state.conn <= C_PAUSED_SYNC_T &&
1670 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1671 P_RS_WRITE_ACK : P_WRITE_ACK;
1672 err = drbd_send_ack(mdev, pcmd, peer_req);
1673 if (pcmd == P_RS_WRITE_ACK)
1674 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1676 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1677 /* we expect it to be marked out of sync anyways...
1678 * maybe assert this? */
1682 /* we delete from the conflict detection hash _after_ we sent out the
1683 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1684 if (mdev->tconn->net_conf->two_primaries) {
1685 spin_lock_irq(&mdev->tconn->req_lock);
1686 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1687 drbd_remove_epoch_entry_interval(mdev, peer_req);
1688 if (peer_req->flags & EE_RESTART_REQUESTS)
1689 restart_conflicting_writes(mdev, sector, peer_req->i.size);
1690 spin_unlock_irq(&mdev->tconn->req_lock);
1692 D_ASSERT(drbd_interval_empty(&peer_req->i));
1694 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1699 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1701 struct drbd_conf *mdev = w->mdev;
1702 struct drbd_peer_request *peer_req =
1703 container_of(w, struct drbd_peer_request, w);
1706 err = drbd_send_ack(mdev, ack, peer_req);
1712 static int e_send_discard_write(struct drbd_work *w, int unused)
1714 return e_send_ack(w, P_DISCARD_WRITE);
1717 static int e_send_retry_write(struct drbd_work *w, int unused)
1719 struct drbd_tconn *tconn = w->mdev->tconn;
1721 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1722 P_RETRY_WRITE : P_DISCARD_WRITE);
1725 static bool seq_greater(u32 a, u32 b)
1728 * We assume 32-bit wrap-around here.
1729 * For 24-bit wrap-around, we would have to shift:
1732 return (s32)a - (s32)b > 0;
1735 static u32 seq_max(u32 a, u32 b)
1737 return seq_greater(a, b) ? a : b;
1740 static bool need_peer_seq(struct drbd_conf *mdev)
1742 struct drbd_tconn *tconn = mdev->tconn;
1745 * We only need to keep track of the last packet_seq number of our peer
1746 * if we are in dual-primary mode and we have the discard flag set; see
1747 * handle_write_conflicts().
1749 return tconn->net_conf->two_primaries &&
1750 test_bit(DISCARD_CONCURRENT, &tconn->flags);
1753 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1755 unsigned int newest_peer_seq;
1757 if (need_peer_seq(mdev)) {
1758 spin_lock(&mdev->peer_seq_lock);
1759 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1760 mdev->peer_seq = newest_peer_seq;
1761 spin_unlock(&mdev->peer_seq_lock);
1762 /* wake up only if we actually changed mdev->peer_seq */
1763 if (peer_seq == newest_peer_seq)
1764 wake_up(&mdev->seq_wait);
1768 /* Called from receive_Data.
1769 * Synchronize packets on sock with packets on msock.
1771 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1772 * packet traveling on msock, they are still processed in the order they have
1775 * Note: we don't care for Ack packets overtaking P_DATA packets.
1777 * In case packet_seq is larger than mdev->peer_seq number, there are
1778 * outstanding packets on the msock. We wait for them to arrive.
1779 * In case we are the logically next packet, we update mdev->peer_seq
1780 * ourselves. Correctly handles 32bit wrap around.
1782 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1783 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1784 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1785 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1787 * returns 0 if we may process the packet,
1788 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1789 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1795 if (!need_peer_seq(mdev))
1798 spin_lock(&mdev->peer_seq_lock);
1800 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1801 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1805 if (signal_pending(current)) {
1809 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1810 spin_unlock(&mdev->peer_seq_lock);
1811 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1812 timeout = schedule_timeout(timeout);
1813 spin_lock(&mdev->peer_seq_lock);
1816 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1820 spin_unlock(&mdev->peer_seq_lock);
1821 finish_wait(&mdev->seq_wait, &wait);
1825 /* see also bio_flags_to_wire()
1826 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1827 * flags and back. We may replicate to other kernel versions. */
1828 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1830 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1831 (dpf & DP_FUA ? REQ_FUA : 0) |
1832 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1833 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1836 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1839 struct drbd_interval *i;
1842 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1843 struct drbd_request *req;
1844 struct bio_and_error m;
1848 req = container_of(i, struct drbd_request, i);
1849 if (!(req->rq_state & RQ_POSTPONED))
1851 req->rq_state &= ~RQ_POSTPONED;
1852 __req_mod(req, NEG_ACKED, &m);
1853 spin_unlock_irq(&mdev->tconn->req_lock);
1855 complete_master_bio(mdev, &m);
1856 spin_lock_irq(&mdev->tconn->req_lock);
1861 static int handle_write_conflicts(struct drbd_conf *mdev,
1862 struct drbd_peer_request *peer_req)
1864 struct drbd_tconn *tconn = mdev->tconn;
1865 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1866 sector_t sector = peer_req->i.sector;
1867 const unsigned int size = peer_req->i.size;
1868 struct drbd_interval *i;
1873 * Inserting the peer request into the write_requests tree will prevent
1874 * new conflicting local requests from being added.
1876 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1879 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1880 if (i == &peer_req->i)
1885 * Our peer has sent a conflicting remote request; this
1886 * should not happen in a two-node setup. Wait for the
1887 * earlier peer request to complete.
1889 err = drbd_wait_misc(mdev, i);
1895 equal = i->sector == sector && i->size == size;
1896 if (resolve_conflicts) {
1898 * If the peer request is fully contained within the
1899 * overlapping request, it can be discarded; otherwise,
1900 * it will be retried once all overlapping requests
1903 bool discard = i->sector <= sector && i->sector +
1904 (i->size >> 9) >= sector + (size >> 9);
1907 dev_alert(DEV, "Concurrent writes detected: "
1908 "local=%llus +%u, remote=%llus +%u, "
1909 "assuming %s came first\n",
1910 (unsigned long long)i->sector, i->size,
1911 (unsigned long long)sector, size,
1912 discard ? "local" : "remote");
1915 peer_req->w.cb = discard ? e_send_discard_write :
1917 list_add_tail(&peer_req->w.list, &mdev->done_ee);
1918 wake_asender(mdev->tconn);
1923 struct drbd_request *req =
1924 container_of(i, struct drbd_request, i);
1927 dev_alert(DEV, "Concurrent writes detected: "
1928 "local=%llus +%u, remote=%llus +%u\n",
1929 (unsigned long long)i->sector, i->size,
1930 (unsigned long long)sector, size);
1932 if (req->rq_state & RQ_LOCAL_PENDING ||
1933 !(req->rq_state & RQ_POSTPONED)) {
1935 * Wait for the node with the discard flag to
1936 * decide if this request will be discarded or
1937 * retried. Requests that are discarded will
1938 * disappear from the write_requests tree.
1940 * In addition, wait for the conflicting
1941 * request to finish locally before submitting
1942 * the conflicting peer request.
1944 err = drbd_wait_misc(mdev, &req->i);
1946 _conn_request_state(mdev->tconn,
1947 NS(conn, C_TIMEOUT),
1949 fail_postponed_requests(mdev, sector, size);
1955 * Remember to restart the conflicting requests after
1956 * the new peer request has completed.
1958 peer_req->flags |= EE_RESTART_REQUESTS;
1965 drbd_remove_epoch_entry_interval(mdev, peer_req);
1969 /* mirrored write */
1970 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
1972 struct drbd_conf *mdev;
1974 struct drbd_peer_request *peer_req;
1975 struct p_data *p = tconn->data.rbuf;
1976 u32 peer_seq = be32_to_cpu(p->seq_num);
1981 mdev = vnr_to_mdev(tconn, pi->vnr);
1985 if (!get_ldev(mdev)) {
1988 err = wait_for_and_update_peer_seq(mdev, peer_seq);
1989 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1990 atomic_inc(&mdev->current_epoch->epoch_size);
1991 err2 = drbd_drain_block(mdev, pi->size);
1998 * Corresponding put_ldev done either below (on various errors), or in
1999 * drbd_peer_request_endio, if we successfully submit the data at the
2000 * end of this function.
2003 sector = be64_to_cpu(p->sector);
2004 peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2010 peer_req->w.cb = e_end_block;
2012 dp_flags = be32_to_cpu(p->dp_flags);
2013 rw |= wire_flags_to_bio(mdev, dp_flags);
2015 if (dp_flags & DP_MAY_SET_IN_SYNC)
2016 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2018 spin_lock(&mdev->epoch_lock);
2019 peer_req->epoch = mdev->current_epoch;
2020 atomic_inc(&peer_req->epoch->epoch_size);
2021 atomic_inc(&peer_req->epoch->active);
2022 spin_unlock(&mdev->epoch_lock);
2024 if (mdev->tconn->net_conf->two_primaries) {
2025 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2027 goto out_interrupted;
2028 spin_lock_irq(&mdev->tconn->req_lock);
2029 err = handle_write_conflicts(mdev, peer_req);
2031 spin_unlock_irq(&mdev->tconn->req_lock);
2032 if (err == -ENOENT) {
2036 goto out_interrupted;
2039 spin_lock_irq(&mdev->tconn->req_lock);
2040 list_add(&peer_req->w.list, &mdev->active_ee);
2041 spin_unlock_irq(&mdev->tconn->req_lock);
2043 switch (mdev->tconn->net_conf->wire_protocol) {
2046 /* corresponding dec_unacked() in e_end_block()
2047 * respective _drbd_clear_done_ee */
2050 /* I really don't like it that the receiver thread
2051 * sends on the msock, but anyways */
2052 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2059 if (mdev->state.pdsk < D_INCONSISTENT) {
2060 /* In case we have the only disk of the cluster, */
2061 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2062 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2063 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2064 drbd_al_begin_io(mdev, &peer_req->i);
2067 err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2071 /* don't care for the reason here */
2072 dev_err(DEV, "submit failed, triggering re-connect\n");
2073 spin_lock_irq(&mdev->tconn->req_lock);
2074 list_del(&peer_req->w.list);
2075 drbd_remove_epoch_entry_interval(mdev, peer_req);
2076 spin_unlock_irq(&mdev->tconn->req_lock);
2077 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2078 drbd_al_complete_io(mdev, &peer_req->i);
2081 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2083 drbd_free_ee(mdev, peer_req);
2087 /* We may throttle resync, if the lower device seems to be busy,
2088 * and current sync rate is above c_min_rate.
2090 * To decide whether or not the lower device is busy, we use a scheme similar
2091 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2092 * (more than 64 sectors) of activity we cannot account for with our own resync
2093 * activity, it obviously is "busy".
2095 * The current sync rate used here uses only the most recent two step marks,
2096 * to have a short time average so we can react faster.
2098 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2100 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2101 unsigned long db, dt, dbdt;
2102 struct lc_element *tmp;
2106 /* feature disabled? */
2107 if (mdev->ldev->dc.c_min_rate == 0)
2110 spin_lock_irq(&mdev->al_lock);
2111 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2113 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2114 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2115 spin_unlock_irq(&mdev->al_lock);
2118 /* Do not slow down if app IO is already waiting for this extent */
2120 spin_unlock_irq(&mdev->al_lock);
2122 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2123 (int)part_stat_read(&disk->part0, sectors[1]) -
2124 atomic_read(&mdev->rs_sect_ev);
2126 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2127 unsigned long rs_left;
2130 mdev->rs_last_events = curr_events;
2132 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2134 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2136 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2137 rs_left = mdev->ov_left;
2139 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2141 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2144 db = mdev->rs_mark_left[i] - rs_left;
2145 dbdt = Bit2KB(db/dt);
2147 if (dbdt > mdev->ldev->dc.c_min_rate)
2154 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2156 struct drbd_conf *mdev;
2159 struct drbd_peer_request *peer_req;
2160 struct digest_info *di = NULL;
2162 unsigned int fault_type;
2163 struct p_block_req *p = tconn->data.rbuf;
2165 mdev = vnr_to_mdev(tconn, pi->vnr);
2168 capacity = drbd_get_capacity(mdev->this_bdev);
2170 sector = be64_to_cpu(p->sector);
2171 size = be32_to_cpu(p->blksize);
2173 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2174 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2175 (unsigned long long)sector, size);
2178 if (sector + (size>>9) > capacity) {
2179 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2180 (unsigned long long)sector, size);
2184 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2187 case P_DATA_REQUEST:
2188 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2190 case P_RS_DATA_REQUEST:
2191 case P_CSUM_RS_REQUEST:
2193 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2197 dec_rs_pending(mdev);
2198 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2203 if (verb && __ratelimit(&drbd_ratelimit_state))
2204 dev_err(DEV, "Can not satisfy peer's read request, "
2205 "no local data.\n");
2207 /* drain possibly payload */
2208 return drbd_drain_block(mdev, pi->size);
2211 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2212 * "criss-cross" setup, that might cause write-out on some other DRBD,
2213 * which in turn might block on the other node at this very place. */
2214 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2221 case P_DATA_REQUEST:
2222 peer_req->w.cb = w_e_end_data_req;
2223 fault_type = DRBD_FAULT_DT_RD;
2224 /* application IO, don't drbd_rs_begin_io */
2227 case P_RS_DATA_REQUEST:
2228 peer_req->w.cb = w_e_end_rsdata_req;
2229 fault_type = DRBD_FAULT_RS_RD;
2230 /* used in the sector offset progress display */
2231 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2235 case P_CSUM_RS_REQUEST:
2236 fault_type = DRBD_FAULT_RS_RD;
2237 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2241 di->digest_size = pi->size;
2242 di->digest = (((char *)di)+sizeof(struct digest_info));
2244 peer_req->digest = di;
2245 peer_req->flags |= EE_HAS_DIGEST;
2247 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2250 if (pi->cmd == P_CSUM_RS_REQUEST) {
2251 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2252 peer_req->w.cb = w_e_end_csum_rs_req;
2253 /* used in the sector offset progress display */
2254 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2255 } else if (pi->cmd == P_OV_REPLY) {
2256 /* track progress, we may need to throttle */
2257 atomic_add(size >> 9, &mdev->rs_sect_in);
2258 peer_req->w.cb = w_e_end_ov_reply;
2259 dec_rs_pending(mdev);
2260 /* drbd_rs_begin_io done when we sent this request,
2261 * but accounting still needs to be done. */
2262 goto submit_for_resync;
2267 if (mdev->ov_start_sector == ~(sector_t)0 &&
2268 mdev->tconn->agreed_pro_version >= 90) {
2269 unsigned long now = jiffies;
2271 mdev->ov_start_sector = sector;
2272 mdev->ov_position = sector;
2273 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2274 mdev->rs_total = mdev->ov_left;
2275 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2276 mdev->rs_mark_left[i] = mdev->ov_left;
2277 mdev->rs_mark_time[i] = now;
2279 dev_info(DEV, "Online Verify start sector: %llu\n",
2280 (unsigned long long)sector);
2282 peer_req->w.cb = w_e_end_ov_req;
2283 fault_type = DRBD_FAULT_RS_RD;
2290 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2291 * wrt the receiver, but it is not as straightforward as it may seem.
2292 * Various places in the resync start and stop logic assume resync
2293 * requests are processed in order, requeuing this on the worker thread
2294 * introduces a bunch of new code for synchronization between threads.
2296 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2297 * "forever", throttling after drbd_rs_begin_io will lock that extent
2298 * for application writes for the same time. For now, just throttle
2299 * here, where the rest of the code expects the receiver to sleep for
2303 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2304 * this defers syncer requests for some time, before letting at least
2305 * on request through. The resync controller on the receiving side
2306 * will adapt to the incoming rate accordingly.
2308 * We cannot throttle here if remote is Primary/SyncTarget:
2309 * we would also throttle its application reads.
2310 * In that case, throttling is done on the SyncTarget only.
2312 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2313 schedule_timeout_uninterruptible(HZ/10);
2314 if (drbd_rs_begin_io(mdev, sector))
2318 atomic_add(size >> 9, &mdev->rs_sect_ev);
2322 spin_lock_irq(&mdev->tconn->req_lock);
2323 list_add_tail(&peer_req->w.list, &mdev->read_ee);
2324 spin_unlock_irq(&mdev->tconn->req_lock);
2326 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2329 /* don't care for the reason here */
2330 dev_err(DEV, "submit failed, triggering re-connect\n");
2331 spin_lock_irq(&mdev->tconn->req_lock);
2332 list_del(&peer_req->w.list);
2333 spin_unlock_irq(&mdev->tconn->req_lock);
2334 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2338 drbd_free_ee(mdev, peer_req);
2342 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2344 int self, peer, rv = -100;
2345 unsigned long ch_self, ch_peer;
2347 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2348 peer = mdev->p_uuid[UI_BITMAP] & 1;
2350 ch_peer = mdev->p_uuid[UI_SIZE];
2351 ch_self = mdev->comm_bm_set;
2353 switch (mdev->tconn->net_conf->after_sb_0p) {
2355 case ASB_DISCARD_SECONDARY:
2356 case ASB_CALL_HELPER:
2357 dev_err(DEV, "Configuration error.\n");
2359 case ASB_DISCONNECT:
2361 case ASB_DISCARD_YOUNGER_PRI:
2362 if (self == 0 && peer == 1) {
2366 if (self == 1 && peer == 0) {
2370 /* Else fall through to one of the other strategies... */
2371 case ASB_DISCARD_OLDER_PRI:
2372 if (self == 0 && peer == 1) {
2376 if (self == 1 && peer == 0) {
2380 /* Else fall through to one of the other strategies... */
2381 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2382 "Using discard-least-changes instead\n");
2383 case ASB_DISCARD_ZERO_CHG:
2384 if (ch_peer == 0 && ch_self == 0) {
2385 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2389 if (ch_peer == 0) { rv = 1; break; }
2390 if (ch_self == 0) { rv = -1; break; }
2392 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2394 case ASB_DISCARD_LEAST_CHG:
2395 if (ch_self < ch_peer)
2397 else if (ch_self > ch_peer)
2399 else /* ( ch_self == ch_peer ) */
2400 /* Well, then use something else. */
2401 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2404 case ASB_DISCARD_LOCAL:
2407 case ASB_DISCARD_REMOTE:
2414 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2418 switch (mdev->tconn->net_conf->after_sb_1p) {
2419 case ASB_DISCARD_YOUNGER_PRI:
2420 case ASB_DISCARD_OLDER_PRI:
2421 case ASB_DISCARD_LEAST_CHG:
2422 case ASB_DISCARD_LOCAL:
2423 case ASB_DISCARD_REMOTE:
2424 dev_err(DEV, "Configuration error.\n");
2426 case ASB_DISCONNECT:
2429 hg = drbd_asb_recover_0p(mdev);
2430 if (hg == -1 && mdev->state.role == R_SECONDARY)
2432 if (hg == 1 && mdev->state.role == R_PRIMARY)
2436 rv = drbd_asb_recover_0p(mdev);
2438 case ASB_DISCARD_SECONDARY:
2439 return mdev->state.role == R_PRIMARY ? 1 : -1;
2440 case ASB_CALL_HELPER:
2441 hg = drbd_asb_recover_0p(mdev);
2442 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2443 enum drbd_state_rv rv2;
2445 drbd_set_role(mdev, R_SECONDARY, 0);
2446 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2447 * we might be here in C_WF_REPORT_PARAMS which is transient.
2448 * we do not need to wait for the after state change work either. */
2449 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2450 if (rv2 != SS_SUCCESS) {
2451 drbd_khelper(mdev, "pri-lost-after-sb");
2453 dev_warn(DEV, "Successfully gave up primary role.\n");
2463 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2467 switch (mdev->tconn->net_conf->after_sb_2p) {
2468 case ASB_DISCARD_YOUNGER_PRI:
2469 case ASB_DISCARD_OLDER_PRI:
2470 case ASB_DISCARD_LEAST_CHG:
2471 case ASB_DISCARD_LOCAL:
2472 case ASB_DISCARD_REMOTE:
2474 case ASB_DISCARD_SECONDARY:
2475 dev_err(DEV, "Configuration error.\n");
2478 rv = drbd_asb_recover_0p(mdev);
2480 case ASB_DISCONNECT:
2482 case ASB_CALL_HELPER:
2483 hg = drbd_asb_recover_0p(mdev);
2485 enum drbd_state_rv rv2;
2487 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2488 * we might be here in C_WF_REPORT_PARAMS which is transient.
2489 * we do not need to wait for the after state change work either. */
2490 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2491 if (rv2 != SS_SUCCESS) {
2492 drbd_khelper(mdev, "pri-lost-after-sb");
2494 dev_warn(DEV, "Successfully gave up primary role.\n");
2504 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2505 u64 bits, u64 flags)
2508 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2511 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2513 (unsigned long long)uuid[UI_CURRENT],
2514 (unsigned long long)uuid[UI_BITMAP],
2515 (unsigned long long)uuid[UI_HISTORY_START],
2516 (unsigned long long)uuid[UI_HISTORY_END],
2517 (unsigned long long)bits,
2518 (unsigned long long)flags);
2522 100 after split brain try auto recover
2523 2 C_SYNC_SOURCE set BitMap
2524 1 C_SYNC_SOURCE use BitMap
2526 -1 C_SYNC_TARGET use BitMap
2527 -2 C_SYNC_TARGET set BitMap
2528 -100 after split brain, disconnect
2529 -1000 unrelated data
2530 -1091 requires proto 91
2531 -1096 requires proto 96
2533 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2538 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2539 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2542 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2546 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2547 peer != UUID_JUST_CREATED)
2551 if (self != UUID_JUST_CREATED &&
2552 (peer == UUID_JUST_CREATED || peer == (u64)0))
2556 int rct, dc; /* roles at crash time */
2558 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2560 if (mdev->tconn->agreed_pro_version < 91)
2563 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2564 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2565 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2566 drbd_uuid_set_bm(mdev, 0UL);
2568 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2569 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2572 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2579 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2581 if (mdev->tconn->agreed_pro_version < 91)
2584 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2585 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2586 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2588 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2589 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2590 mdev->p_uuid[UI_BITMAP] = 0UL;
2592 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2595 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2602 /* Common power [off|failure] */
2603 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2604 (mdev->p_uuid[UI_FLAGS] & 2);
2605 /* lowest bit is set when we were primary,
2606 * next bit (weight 2) is set when peer was primary */
2610 case 0: /* !self_pri && !peer_pri */ return 0;
2611 case 1: /* self_pri && !peer_pri */ return 1;
2612 case 2: /* !self_pri && peer_pri */ return -1;
2613 case 3: /* self_pri && peer_pri */
2614 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2620 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2625 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2627 if (mdev->tconn->agreed_pro_version < 96 ?
2628 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2629 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2630 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2631 /* The last P_SYNC_UUID did not get though. Undo the last start of
2632 resync as sync source modifications of the peer's UUIDs. */
2634 if (mdev->tconn->agreed_pro_version < 91)
2637 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2638 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2640 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2641 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2648 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2649 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2650 peer = mdev->p_uuid[i] & ~((u64)1);
2656 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2657 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2662 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2664 if (mdev->tconn->agreed_pro_version < 96 ?
2665 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2666 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2667 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2668 /* The last P_SYNC_UUID did not get though. Undo the last start of
2669 resync as sync source modifications of our UUIDs. */
2671 if (mdev->tconn->agreed_pro_version < 91)
2674 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2675 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2677 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2678 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2679 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2687 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2688 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2689 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2695 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2696 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2697 if (self == peer && self != ((u64)0))
2701 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2702 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2703 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2704 peer = mdev->p_uuid[j] & ~((u64)1);
2713 /* drbd_sync_handshake() returns the new conn state on success, or
2714 CONN_MASK (-1) on failure.
2716 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2717 enum drbd_disk_state peer_disk) __must_hold(local)
2720 enum drbd_conns rv = C_MASK;
2721 enum drbd_disk_state mydisk;
2723 mydisk = mdev->state.disk;
2724 if (mydisk == D_NEGOTIATING)
2725 mydisk = mdev->new_state_tmp.disk;
2727 dev_info(DEV, "drbd_sync_handshake:\n");
2728 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2729 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2730 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2732 hg = drbd_uuid_compare(mdev, &rule_nr);
2734 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2737 dev_alert(DEV, "Unrelated data, aborting!\n");
2741 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2745 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2746 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2747 int f = (hg == -100) || abs(hg) == 2;
2748 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2751 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2752 hg > 0 ? "source" : "target");
2756 drbd_khelper(mdev, "initial-split-brain");
2758 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
2759 int pcount = (mdev->state.role == R_PRIMARY)
2760 + (peer_role == R_PRIMARY);
2761 int forced = (hg == -100);
2765 hg = drbd_asb_recover_0p(mdev);
2768 hg = drbd_asb_recover_1p(mdev);
2771 hg = drbd_asb_recover_2p(mdev);
2774 if (abs(hg) < 100) {
2775 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2776 "automatically solved. Sync from %s node\n",
2777 pcount, (hg < 0) ? "peer" : "this");
2779 dev_warn(DEV, "Doing a full sync, since"
2780 " UUIDs where ambiguous.\n");
2787 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2789 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2793 dev_warn(DEV, "Split-Brain detected, manually solved. "
2794 "Sync from %s node\n",
2795 (hg < 0) ? "peer" : "this");
2799 /* FIXME this log message is not correct if we end up here
2800 * after an attempted attach on a diskless node.
2801 * We just refuse to attach -- well, we drop the "connection"
2802 * to that disk, in a way... */
2803 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2804 drbd_khelper(mdev, "split-brain");
2808 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2809 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2813 if (hg < 0 && /* by intention we do not use mydisk here. */
2814 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2815 switch (mdev->tconn->net_conf->rr_conflict) {
2816 case ASB_CALL_HELPER:
2817 drbd_khelper(mdev, "pri-lost");
2819 case ASB_DISCONNECT:
2820 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2823 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2828 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2830 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2832 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2833 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2834 abs(hg) >= 2 ? "full" : "bit-map based");
2839 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2840 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2841 BM_LOCKED_SET_ALLOWED))
2845 if (hg > 0) { /* become sync source. */
2847 } else if (hg < 0) { /* become sync target */
2851 if (drbd_bm_total_weight(mdev)) {
2852 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2853 drbd_bm_total_weight(mdev));
2860 /* returns 1 if invalid */
2861 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2863 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2864 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2865 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2868 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2869 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2870 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2873 /* everything else is valid if they are equal on both sides. */
2877 /* everything es is invalid. */
2881 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2883 struct p_protocol *p = tconn->data.rbuf;
2884 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2885 int p_want_lose, p_two_primaries, cf;
2886 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2888 p_proto = be32_to_cpu(p->protocol);
2889 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2890 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2891 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
2892 p_two_primaries = be32_to_cpu(p->two_primaries);
2893 cf = be32_to_cpu(p->conn_flags);
2894 p_want_lose = cf & CF_WANT_LOSE;
2896 clear_bit(CONN_DRY_RUN, &tconn->flags);
2898 if (cf & CF_DRY_RUN)
2899 set_bit(CONN_DRY_RUN, &tconn->flags);
2901 if (p_proto != tconn->net_conf->wire_protocol) {
2902 conn_err(tconn, "incompatible communication protocols\n");
2906 if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2907 conn_err(tconn, "incompatible after-sb-0pri settings\n");
2911 if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2912 conn_err(tconn, "incompatible after-sb-1pri settings\n");
2916 if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2917 conn_err(tconn, "incompatible after-sb-2pri settings\n");
2921 if (p_want_lose && tconn->net_conf->want_lose) {
2922 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
2926 if (p_two_primaries != tconn->net_conf->two_primaries) {
2927 conn_err(tconn, "incompatible setting of the two-primaries options\n");
2931 if (tconn->agreed_pro_version >= 87) {
2932 unsigned char *my_alg = tconn->net_conf->integrity_alg;
2935 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
2939 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2940 if (strcmp(p_integrity_alg, my_alg)) {
2941 conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
2944 conn_info(tconn, "data-integrity-alg: %s\n",
2945 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2951 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
2956 * input: alg name, feature name
2957 * return: NULL (alg name was "")
2958 * ERR_PTR(error) if something goes wrong
2959 * or the crypto hash ptr, if it worked out ok. */
2960 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2961 const char *alg, const char *name)
2963 struct crypto_hash *tfm;
2968 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2970 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2971 alg, name, PTR_ERR(tfm));
2974 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2975 crypto_free_hash(tfm);
2976 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2977 return ERR_PTR(-EINVAL);
2982 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
2984 void *buffer = tconn->data.rbuf;
2985 int size = pi->size;
2988 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
2989 s = drbd_recv(tconn, buffer, s);
3003 * config_unknown_volume - device configuration command for unknown volume
3005 * When a device is added to an existing connection, the node on which the
3006 * device is added first will send configuration commands to its peer but the
3007 * peer will not know about the device yet. It will warn and ignore these
3008 * commands. Once the device is added on the second node, the second node will
3009 * send the same device configuration commands, but in the other direction.
3011 * (We can also end up here if drbd is misconfigured.)
3013 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3015 conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3016 pi->vnr, cmdname(pi->cmd));
3017 return ignore_remaining_packet(tconn, pi);
3020 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3022 struct drbd_conf *mdev;
3023 struct p_rs_param_95 *p = tconn->data.rbuf;
3024 unsigned int header_size, data_size, exp_max_sz;
3025 struct crypto_hash *verify_tfm = NULL;
3026 struct crypto_hash *csums_tfm = NULL;
3027 const int apv = tconn->agreed_pro_version;
3028 int *rs_plan_s = NULL;
3032 mdev = vnr_to_mdev(tconn, pi->vnr);
3034 return config_unknown_volume(tconn, pi);
3036 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3037 : apv == 88 ? sizeof(struct p_rs_param)
3039 : apv <= 94 ? sizeof(struct p_rs_param_89)
3040 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3042 if (pi->size > exp_max_sz) {
3043 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3044 pi->size, exp_max_sz);
3049 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
3050 data_size = pi->size - header_size;
3051 } else if (apv <= 94) {
3052 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
3053 data_size = pi->size - header_size;
3054 D_ASSERT(data_size == 0);
3056 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
3057 data_size = pi->size - header_size;
3058 D_ASSERT(data_size == 0);
3061 /* initialize verify_alg and csums_alg */
3062 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3064 err = drbd_recv_all(mdev->tconn, &p->head.payload, header_size);
3068 if (get_ldev(mdev)) {
3069 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3075 if (data_size > SHARED_SECRET_MAX) {
3076 dev_err(DEV, "verify-alg too long, "
3077 "peer wants %u, accepting only %u byte\n",
3078 data_size, SHARED_SECRET_MAX);
3082 err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3086 /* we expect NUL terminated string */
3087 /* but just in case someone tries to be evil */
3088 D_ASSERT(p->verify_alg[data_size-1] == 0);
3089 p->verify_alg[data_size-1] = 0;
3091 } else /* apv >= 89 */ {
3092 /* we still expect NUL terminated strings */
3093 /* but just in case someone tries to be evil */
3094 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3095 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3096 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3097 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3100 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
3101 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3102 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3103 mdev->tconn->net_conf->verify_alg, p->verify_alg);
3106 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3107 p->verify_alg, "verify-alg");
3108 if (IS_ERR(verify_tfm)) {
3114 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
3115 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3116 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3117 mdev->tconn->net_conf->csums_alg, p->csums_alg);
3120 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3121 p->csums_alg, "csums-alg");
3122 if (IS_ERR(csums_tfm)) {
3128 if (apv > 94 && get_ldev(mdev)) {
3129 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3130 mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3131 mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3132 mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3133 mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3135 fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3136 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3137 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3139 dev_err(DEV, "kmalloc of fifo_buffer failed");
3147 spin_lock(&mdev->peer_seq_lock);
3148 /* lock against drbd_nl_syncer_conf() */
3150 strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3151 mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3152 crypto_free_hash(mdev->tconn->verify_tfm);
3153 mdev->tconn->verify_tfm = verify_tfm;
3154 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3157 strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3158 mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3159 crypto_free_hash(mdev->tconn->csums_tfm);
3160 mdev->tconn->csums_tfm = csums_tfm;
3161 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3163 if (fifo_size != mdev->rs_plan_s.size) {
3164 kfree(mdev->rs_plan_s.values);
3165 mdev->rs_plan_s.values = rs_plan_s;
3166 mdev->rs_plan_s.size = fifo_size;
3167 mdev->rs_planed = 0;
3169 spin_unlock(&mdev->peer_seq_lock);
3174 /* just for completeness: actually not needed,
3175 * as this is not reached if csums_tfm was ok. */
3176 crypto_free_hash(csums_tfm);
3177 /* but free the verify_tfm again, if csums_tfm did not work out */
3178 crypto_free_hash(verify_tfm);
3179 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3183 /* warn if the arguments differ by more than 12.5% */
3184 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3185 const char *s, sector_t a, sector_t b)
3188 if (a == 0 || b == 0)
3190 d = (a > b) ? (a - b) : (b - a);
3191 if (d > (a>>3) || d > (b>>3))
3192 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3193 (unsigned long long)a, (unsigned long long)b);
3196 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3198 struct drbd_conf *mdev;
3199 struct p_sizes *p = tconn->data.rbuf;
3200 enum determine_dev_size dd = unchanged;
3201 sector_t p_size, p_usize, my_usize;
3202 int ldsc = 0; /* local disk size changed */
3203 enum dds_flags ddsf;
3205 mdev = vnr_to_mdev(tconn, pi->vnr);
3207 return config_unknown_volume(tconn, pi);
3209 p_size = be64_to_cpu(p->d_size);
3210 p_usize = be64_to_cpu(p->u_size);
3212 /* just store the peer's disk size for now.
3213 * we still need to figure out whether we accept that. */
3214 mdev->p_size = p_size;
3216 if (get_ldev(mdev)) {
3217 warn_if_differ_considerably(mdev, "lower level device sizes",
3218 p_size, drbd_get_max_capacity(mdev->ldev));
3219 warn_if_differ_considerably(mdev, "user requested size",
3220 p_usize, mdev->ldev->dc.disk_size);
3222 /* if this is the first connect, or an otherwise expected
3223 * param exchange, choose the minimum */
3224 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3225 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3228 my_usize = mdev->ldev->dc.disk_size;
3230 if (mdev->ldev->dc.disk_size != p_usize) {
3231 mdev->ldev->dc.disk_size = p_usize;
3232 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3233 (unsigned long)mdev->ldev->dc.disk_size);
3236 /* Never shrink a device with usable data during connect.
3237 But allow online shrinking if we are connected. */
3238 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3239 drbd_get_capacity(mdev->this_bdev) &&
3240 mdev->state.disk >= D_OUTDATED &&
3241 mdev->state.conn < C_CONNECTED) {
3242 dev_err(DEV, "The peer's disk size is too small!\n");
3243 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3244 mdev->ldev->dc.disk_size = my_usize;
3251 ddsf = be16_to_cpu(p->dds_flags);
3252 if (get_ldev(mdev)) {
3253 dd = drbd_determine_dev_size(mdev, ddsf);
3255 if (dd == dev_size_error)
3259 /* I am diskless, need to accept the peer's size. */
3260 drbd_set_my_capacity(mdev, p_size);
3263 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3264 drbd_reconsider_max_bio_size(mdev);
3266 if (get_ldev(mdev)) {
3267 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3268 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3275 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3276 if (be64_to_cpu(p->c_size) !=
3277 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3278 /* we have different sizes, probably peer
3279 * needs to know my new size... */
3280 drbd_send_sizes(mdev, 0, ddsf);
3282 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3283 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3284 if (mdev->state.pdsk >= D_INCONSISTENT &&
3285 mdev->state.disk >= D_INCONSISTENT) {
3286 if (ddsf & DDSF_NO_RESYNC)
3287 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3289 resync_after_online_grow(mdev);
3291 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3298 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3300 struct drbd_conf *mdev;
3301 struct p_uuids *p = tconn->data.rbuf;
3303 int i, updated_uuids = 0;
3305 mdev = vnr_to_mdev(tconn, pi->vnr);
3307 return config_unknown_volume(tconn, pi);
3309 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3311 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3312 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3314 kfree(mdev->p_uuid);
3315 mdev->p_uuid = p_uuid;
3317 if (mdev->state.conn < C_CONNECTED &&
3318 mdev->state.disk < D_INCONSISTENT &&
3319 mdev->state.role == R_PRIMARY &&
3320 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3321 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3322 (unsigned long long)mdev->ed_uuid);
3323 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3327 if (get_ldev(mdev)) {
3328 int skip_initial_sync =
3329 mdev->state.conn == C_CONNECTED &&
3330 mdev->tconn->agreed_pro_version >= 90 &&
3331 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3332 (p_uuid[UI_FLAGS] & 8);
3333 if (skip_initial_sync) {
3334 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3335 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3336 "clear_n_write from receive_uuids",
3337 BM_LOCKED_TEST_ALLOWED);
3338 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3339 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3340 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3346 } else if (mdev->state.disk < D_INCONSISTENT &&
3347 mdev->state.role == R_PRIMARY) {
3348 /* I am a diskless primary, the peer just created a new current UUID
3350 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3353 /* Before we test for the disk state, we should wait until an eventually
3354 ongoing cluster wide state change is finished. That is important if
3355 we are primary and are detaching from our disk. We need to see the
3356 new disk state... */
3357 mutex_lock(mdev->state_mutex);
3358 mutex_unlock(mdev->state_mutex);
3359 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3360 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3363 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3369 * convert_state() - Converts the peer's view of the cluster state to our point of view
3370 * @ps: The state as seen by the peer.
3372 static union drbd_state convert_state(union drbd_state ps)
3374 union drbd_state ms;
3376 static enum drbd_conns c_tab[] = {
3377 [C_CONNECTED] = C_CONNECTED,
3379 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3380 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3381 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3382 [C_VERIFY_S] = C_VERIFY_T,
3388 ms.conn = c_tab[ps.conn];
3393 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3398 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3400 struct drbd_conf *mdev;
3401 struct p_req_state *p = tconn->data.rbuf;
3402 union drbd_state mask, val;
3403 enum drbd_state_rv rv;
3405 mdev = vnr_to_mdev(tconn, pi->vnr);
3409 mask.i = be32_to_cpu(p->mask);
3410 val.i = be32_to_cpu(p->val);
3412 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3413 mutex_is_locked(mdev->state_mutex)) {
3414 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3418 mask = convert_state(mask);
3419 val = convert_state(val);
3421 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3422 drbd_send_sr_reply(mdev, rv);
3429 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3431 struct p_req_state *p = tconn->data.rbuf;
3432 union drbd_state mask, val;
3433 enum drbd_state_rv rv;
3435 mask.i = be32_to_cpu(p->mask);
3436 val.i = be32_to_cpu(p->val);
3438 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3439 mutex_is_locked(&tconn->cstate_mutex)) {
3440 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3444 mask = convert_state(mask);
3445 val = convert_state(val);
3447 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3448 conn_send_sr_reply(tconn, rv);
3453 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3455 struct drbd_conf *mdev;
3456 struct p_state *p = tconn->data.rbuf;
3457 union drbd_state os, ns, peer_state;
3458 enum drbd_disk_state real_peer_disk;
3459 enum chg_state_flags cs_flags;
3462 mdev = vnr_to_mdev(tconn, pi->vnr);
3464 return config_unknown_volume(tconn, pi);
3466 peer_state.i = be32_to_cpu(p->state);
3468 real_peer_disk = peer_state.disk;
3469 if (peer_state.disk == D_NEGOTIATING) {
3470 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3471 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3474 spin_lock_irq(&mdev->tconn->req_lock);
3476 os = ns = mdev->state;
3477 spin_unlock_irq(&mdev->tconn->req_lock);
3479 /* peer says his disk is uptodate, while we think it is inconsistent,
3480 * and this happens while we think we have a sync going on. */
3481 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3482 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3483 /* If we are (becoming) SyncSource, but peer is still in sync
3484 * preparation, ignore its uptodate-ness to avoid flapping, it
3485 * will change to inconsistent once the peer reaches active
3487 * It may have changed syncer-paused flags, however, so we
3488 * cannot ignore this completely. */
3489 if (peer_state.conn > C_CONNECTED &&
3490 peer_state.conn < C_SYNC_SOURCE)
3491 real_peer_disk = D_INCONSISTENT;
3493 /* if peer_state changes to connected at the same time,
3494 * it explicitly notifies us that it finished resync.
3495 * Maybe we should finish it up, too? */
3496 else if (os.conn >= C_SYNC_SOURCE &&
3497 peer_state.conn == C_CONNECTED) {
3498 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3499 drbd_resync_finished(mdev);
3504 /* peer says his disk is inconsistent, while we think it is uptodate,
3505 * and this happens while the peer still thinks we have a sync going on,
3506 * but we think we are already done with the sync.
3507 * We ignore this to avoid flapping pdsk.
3508 * This should not happen, if the peer is a recent version of drbd. */
3509 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3510 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3511 real_peer_disk = D_UP_TO_DATE;
3513 if (ns.conn == C_WF_REPORT_PARAMS)
3514 ns.conn = C_CONNECTED;
3516 if (peer_state.conn == C_AHEAD)
3519 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3520 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3521 int cr; /* consider resync */
3523 /* if we established a new connection */
3524 cr = (os.conn < C_CONNECTED);
3525 /* if we had an established connection
3526 * and one of the nodes newly attaches a disk */
3527 cr |= (os.conn == C_CONNECTED &&
3528 (peer_state.disk == D_NEGOTIATING ||
3529 os.disk == D_NEGOTIATING));
3530 /* if we have both been inconsistent, and the peer has been
3531 * forced to be UpToDate with --overwrite-data */
3532 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3533 /* if we had been plain connected, and the admin requested to
3534 * start a sync by "invalidate" or "invalidate-remote" */
3535 cr |= (os.conn == C_CONNECTED &&
3536 (peer_state.conn >= C_STARTING_SYNC_S &&
3537 peer_state.conn <= C_WF_BITMAP_T));
3540 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3543 if (ns.conn == C_MASK) {
3544 ns.conn = C_CONNECTED;
3545 if (mdev->state.disk == D_NEGOTIATING) {
3546 drbd_force_state(mdev, NS(disk, D_FAILED));
3547 } else if (peer_state.disk == D_NEGOTIATING) {
3548 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3549 peer_state.disk = D_DISKLESS;
3550 real_peer_disk = D_DISKLESS;
3552 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3554 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3555 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3561 spin_lock_irq(&mdev->tconn->req_lock);
3562 if (mdev->state.i != os.i)
3564 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3565 ns.peer = peer_state.role;
3566 ns.pdsk = real_peer_disk;
3567 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3568 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3569 ns.disk = mdev->new_state_tmp.disk;
3570 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3571 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3572 test_bit(NEW_CUR_UUID, &mdev->flags)) {
3573 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3574 for temporal network outages! */
3575 spin_unlock_irq(&mdev->tconn->req_lock);
3576 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3577 tl_clear(mdev->tconn);
3578 drbd_uuid_new_current(mdev);
3579 clear_bit(NEW_CUR_UUID, &mdev->flags);
3580 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3583 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3585 spin_unlock_irq(&mdev->tconn->req_lock);
3587 if (rv < SS_SUCCESS) {
3588 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3592 if (os.conn > C_WF_REPORT_PARAMS) {
3593 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3594 peer_state.disk != D_NEGOTIATING ) {
3595 /* we want resync, peer has not yet decided to sync... */
3596 /* Nowadays only used when forcing a node into primary role and
3597 setting its disk to UpToDate with that */
3598 drbd_send_uuids(mdev);
3599 drbd_send_state(mdev);
3603 mdev->tconn->net_conf->want_lose = 0;
3605 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3610 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3612 struct drbd_conf *mdev;
3613 struct p_rs_uuid *p = tconn->data.rbuf;
3615 mdev = vnr_to_mdev(tconn, pi->vnr);
3619 wait_event(mdev->misc_wait,
3620 mdev->state.conn == C_WF_SYNC_UUID ||
3621 mdev->state.conn == C_BEHIND ||
3622 mdev->state.conn < C_CONNECTED ||
3623 mdev->state.disk < D_NEGOTIATING);
3625 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3627 /* Here the _drbd_uuid_ functions are right, current should
3628 _not_ be rotated into the history */
3629 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3630 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3631 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3633 drbd_print_uuids(mdev, "updated sync uuid");
3634 drbd_start_resync(mdev, C_SYNC_TARGET);
3638 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3644 * receive_bitmap_plain
3646 * Return 0 when done, 1 when another iteration is needed, and a negative error
3647 * code upon failure.
3650 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3651 struct p_header *h, struct bm_xfer_ctx *c)
3653 unsigned long *buffer = (unsigned long *)h->payload;
3654 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3655 unsigned want = num_words * sizeof(long);
3658 if (want != data_size) {
3659 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3664 err = drbd_recv_all(mdev->tconn, buffer, want);
3668 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3670 c->word_offset += num_words;
3671 c->bit_offset = c->word_offset * BITS_PER_LONG;
3672 if (c->bit_offset > c->bm_bits)
3673 c->bit_offset = c->bm_bits;
3678 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3680 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3683 static int dcbp_get_start(struct p_compressed_bm *p)
3685 return (p->encoding & 0x80) != 0;
3688 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3690 return (p->encoding >> 4) & 0x7;
3696 * Return 0 when done, 1 when another iteration is needed, and a negative error
3697 * code upon failure.
3700 recv_bm_rle_bits(struct drbd_conf *mdev,
3701 struct p_compressed_bm *p,
3702 struct bm_xfer_ctx *c,
3705 struct bitstream bs;
3709 unsigned long s = c->bit_offset;
3711 int toggle = dcbp_get_start(p);
3715 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3717 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3721 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3722 bits = vli_decode_bits(&rl, look_ahead);
3728 if (e >= c->bm_bits) {
3729 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3732 _drbd_bm_set_bits(mdev, s, e);
3736 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3737 have, bits, look_ahead,
3738 (unsigned int)(bs.cur.b - p->code),
3739 (unsigned int)bs.buf_len);
3742 look_ahead >>= bits;
3745 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3748 look_ahead |= tmp << have;
3753 bm_xfer_ctx_bit_to_word_offset(c);
3755 return (s != c->bm_bits);
3761 * Return 0 when done, 1 when another iteration is needed, and a negative error
3762 * code upon failure.
3765 decode_bitmap_c(struct drbd_conf *mdev,
3766 struct p_compressed_bm *p,
3767 struct bm_xfer_ctx *c,
3770 if (dcbp_get_code(p) == RLE_VLI_Bits)
3771 return recv_bm_rle_bits(mdev, p, c, len);
3773 /* other variants had been implemented for evaluation,
3774 * but have been dropped as this one turned out to be "best"
3775 * during all our tests. */
3777 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3778 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3782 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3783 const char *direction, struct bm_xfer_ctx *c)
3785 /* what would it take to transfer it "plaintext" */
3786 unsigned plain = sizeof(struct p_header) *
3787 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3788 + c->bm_words * sizeof(long);
3789 unsigned total = c->bytes[0] + c->bytes[1];
3792 /* total can not be zero. but just in case: */
3796 /* don't report if not compressed */
3800 /* total < plain. check for overflow, still */
3801 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3802 : (1000 * total / plain);
3808 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3809 "total %u; compression: %u.%u%%\n",
3811 c->bytes[1], c->packets[1],
3812 c->bytes[0], c->packets[0],
3813 total, r/10, r % 10);
3816 /* Since we are processing the bitfield from lower addresses to higher,
3817 it does not matter if the process it in 32 bit chunks or 64 bit
3818 chunks as long as it is little endian. (Understand it as byte stream,
3819 beginning with the lowest byte...) If we would use big endian
3820 we would need to process it from the highest address to the lowest,
3821 in order to be agnostic to the 32 vs 64 bits issue.
3823 returns 0 on failure, 1 if we successfully received it. */
3824 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3826 struct drbd_conf *mdev;
3827 struct bm_xfer_ctx c;
3829 struct p_header *h = tconn->data.rbuf;
3831 mdev = vnr_to_mdev(tconn, pi->vnr);
3835 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3836 /* you are supposed to send additional out-of-sync information
3837 * if you actually set bits during this phase */
3839 c = (struct bm_xfer_ctx) {
3840 .bm_bits = drbd_bm_bits(mdev),
3841 .bm_words = drbd_bm_words(mdev),
3845 if (pi->cmd == P_BITMAP) {
3846 err = receive_bitmap_plain(mdev, pi->size, h, &c);
3847 } else if (pi->cmd == P_COMPRESSED_BITMAP) {
3848 /* MAYBE: sanity check that we speak proto >= 90,
3849 * and the feature is enabled! */
3850 struct p_compressed_bm *p;
3852 if (pi->size > BM_PACKET_PAYLOAD_BYTES) {
3853 dev_err(DEV, "ReportCBitmap packet too large\n");
3858 p = mdev->tconn->data.rbuf;
3859 err = drbd_recv_all(mdev->tconn, p->head.payload, pi->size);
3862 if (pi->size <= (sizeof(*p) - sizeof(p->head))) {
3863 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
3867 err = decode_bitmap_c(mdev, p, &c, pi->size);
3869 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
3874 c.packets[pi->cmd == P_BITMAP]++;
3875 c.bytes[pi->cmd == P_BITMAP] += sizeof(struct p_header) + pi->size;
3882 err = drbd_recv_header(mdev->tconn, pi);
3887 INFO_bm_xfer_stats(mdev, "receive", &c);
3889 if (mdev->state.conn == C_WF_BITMAP_T) {
3890 enum drbd_state_rv rv;
3892 err = drbd_send_bitmap(mdev);
3895 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3896 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3897 D_ASSERT(rv == SS_SUCCESS);
3898 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3899 /* admin may have requested C_DISCONNECTING,
3900 * other threads may have noticed network errors */
3901 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3902 drbd_conn_str(mdev->state.conn));
3907 drbd_bm_unlock(mdev);
3908 if (!err && mdev->state.conn == C_WF_BITMAP_S)
3909 drbd_start_resync(mdev, C_SYNC_SOURCE);
3913 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
3915 conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
3918 return ignore_remaining_packet(tconn, pi);
3921 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
3923 /* Make sure we've acked all the TCP data associated
3924 * with the data requests being unplugged */
3925 drbd_tcp_quickack(tconn->data.socket);
3930 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
3932 struct drbd_conf *mdev;
3933 struct p_block_desc *p = tconn->data.rbuf;
3935 mdev = vnr_to_mdev(tconn, pi->vnr);
3939 switch (mdev->state.conn) {
3940 case C_WF_SYNC_UUID:
3945 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3946 drbd_conn_str(mdev->state.conn));
3949 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3957 int (*fn)(struct drbd_tconn *, struct packet_info *);
3960 static struct data_cmd drbd_cmd_handler[] = {
3961 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3962 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3963 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3964 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3965 [P_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3966 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3967 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), receive_UnplugRemote },
3968 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3969 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3970 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), receive_SyncParam },
3971 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), receive_SyncParam },
3972 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
3973 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
3974 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
3975 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
3976 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
3977 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3978 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3979 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3980 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3981 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
3982 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3983 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
3986 static void drbdd(struct drbd_tconn *tconn)
3988 struct p_header *header = tconn->data.rbuf;
3989 struct packet_info pi;
3990 size_t shs; /* sub header size */
3993 while (get_t_state(&tconn->receiver) == RUNNING) {
3994 struct data_cmd *cmd;
3996 drbd_thread_current_set_cpu(&tconn->receiver);
3997 if (drbd_recv_header(tconn, &pi))
4000 cmd = &drbd_cmd_handler[pi.cmd];
4001 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4002 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4006 shs = cmd->pkt_size - sizeof(struct p_header);
4007 if (pi.size - shs > 0 && !cmd->expect_payload) {
4008 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4013 err = drbd_recv_all_warn(tconn, &header->payload, shs);
4019 err = cmd->fn(tconn, &pi);
4021 conn_err(tconn, "error receiving %s, l: %d!\n",
4022 cmdname(pi.cmd), pi.size);
4029 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4032 void conn_flush_workqueue(struct drbd_tconn *tconn)
4034 struct drbd_wq_barrier barr;
4036 barr.w.cb = w_prev_work_done;
4037 barr.w.tconn = tconn;
4038 init_completion(&barr.done);
4039 drbd_queue_work(&tconn->data.work, &barr.w);
4040 wait_for_completion(&barr.done);
4043 static void drbd_disconnect(struct drbd_tconn *tconn)
4046 int rv = SS_UNKNOWN_ERROR;
4048 if (tconn->cstate == C_STANDALONE)
4051 /* asender does not clean up anything. it must not interfere, either */
4052 drbd_thread_stop(&tconn->asender);
4053 drbd_free_sock(tconn);
4055 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4056 conn_info(tconn, "Connection closed\n");
4058 if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4059 conn_try_outdate_peer_async(tconn);
4061 spin_lock_irq(&tconn->req_lock);
4063 if (oc >= C_UNCONNECTED)
4064 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4066 spin_unlock_irq(&tconn->req_lock);
4068 if (oc == C_DISCONNECTING) {
4069 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4071 crypto_free_hash(tconn->cram_hmac_tfm);
4072 tconn->cram_hmac_tfm = NULL;
4074 kfree(tconn->net_conf);
4075 tconn->net_conf = NULL;
4076 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
4080 static int drbd_disconnected(int vnr, void *p, void *data)
4082 struct drbd_conf *mdev = (struct drbd_conf *)p;
4083 enum drbd_fencing_p fp;
4086 /* wait for current activity to cease. */
4087 spin_lock_irq(&mdev->tconn->req_lock);
4088 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4089 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4090 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4091 spin_unlock_irq(&mdev->tconn->req_lock);
4093 /* We do not have data structures that would allow us to
4094 * get the rs_pending_cnt down to 0 again.
4095 * * On C_SYNC_TARGET we do not have any data structures describing
4096 * the pending RSDataRequest's we have sent.
4097 * * On C_SYNC_SOURCE there is no data structure that tracks
4098 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4099 * And no, it is not the sum of the reference counts in the
4100 * resync_LRU. The resync_LRU tracks the whole operation including
4101 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4103 drbd_rs_cancel_all(mdev);
4105 mdev->rs_failed = 0;
4106 atomic_set(&mdev->rs_pending_cnt, 0);
4107 wake_up(&mdev->misc_wait);
4109 del_timer(&mdev->request_timer);
4111 del_timer_sync(&mdev->resync_timer);
4112 resync_timer_fn((unsigned long)mdev);
4114 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4115 * w_make_resync_request etc. which may still be on the worker queue
4116 * to be "canceled" */
4117 drbd_flush_workqueue(mdev);
4119 /* This also does reclaim_net_ee(). If we do this too early, we might
4120 * miss some resync ee and pages.*/
4121 drbd_process_done_ee(mdev);
4123 kfree(mdev->p_uuid);
4124 mdev->p_uuid = NULL;
4126 if (!is_susp(mdev->state))
4127 tl_clear(mdev->tconn);
4132 if (get_ldev(mdev)) {
4133 fp = mdev->ldev->dc.fencing;
4137 /* serialize with bitmap writeout triggered by the state change,
4139 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4141 /* tcp_close and release of sendpage pages can be deferred. I don't
4142 * want to use SO_LINGER, because apparently it can be deferred for
4143 * more than 20 seconds (longest time I checked).
4145 * Actually we don't care for exactly when the network stack does its
4146 * put_page(), but release our reference on these pages right here.
4148 i = drbd_release_ee(mdev, &mdev->net_ee);
4150 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4151 i = atomic_read(&mdev->pp_in_use_by_net);
4153 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4154 i = atomic_read(&mdev->pp_in_use);
4156 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4158 D_ASSERT(list_empty(&mdev->read_ee));
4159 D_ASSERT(list_empty(&mdev->active_ee));
4160 D_ASSERT(list_empty(&mdev->sync_ee));
4161 D_ASSERT(list_empty(&mdev->done_ee));
4163 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4164 atomic_set(&mdev->current_epoch->epoch_size, 0);
4165 D_ASSERT(list_empty(&mdev->current_epoch->list));
4171 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4172 * we can agree on is stored in agreed_pro_version.
4174 * feature flags and the reserved array should be enough room for future
4175 * enhancements of the handshake protocol, and possible plugins...
4177 * for now, they are expected to be zero, but ignored.
4179 static int drbd_send_features(struct drbd_tconn *tconn)
4181 /* ASSERT current == mdev->tconn->receiver ... */
4182 struct p_connection_features *p = tconn->data.sbuf;
4185 if (mutex_lock_interruptible(&tconn->data.mutex)) {
4186 conn_err(tconn, "interrupted during initial handshake\n");
4190 if (tconn->data.socket == NULL) {
4191 mutex_unlock(&tconn->data.mutex);
4195 memset(p, 0, sizeof(*p));
4196 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4197 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4198 err = _conn_send_cmd(tconn, 0, &tconn->data, P_CONNECTION_FEATURES,
4199 &p->head, sizeof(*p), 0);
4200 mutex_unlock(&tconn->data.mutex);
4206 * 1 yes, we have a valid connection
4207 * 0 oops, did not work out, please try again
4208 * -1 peer talks different language,
4209 * no point in trying again, please go standalone.
4211 static int drbd_do_features(struct drbd_tconn *tconn)
4213 /* ASSERT current == tconn->receiver ... */
4214 struct p_connection_features *p = tconn->data.rbuf;
4215 const int expect = sizeof(struct p_connection_features) - sizeof(struct p_header80);
4216 struct packet_info pi;
4219 err = drbd_send_features(tconn);
4223 err = drbd_recv_header(tconn, &pi);
4227 if (pi.cmd != P_CONNECTION_FEATURES) {
4228 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4229 cmdname(pi.cmd), pi.cmd);
4233 if (pi.size != expect) {
4234 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4239 err = drbd_recv_all_warn(tconn, &p->head.payload, expect);
4243 p->protocol_min = be32_to_cpu(p->protocol_min);
4244 p->protocol_max = be32_to_cpu(p->protocol_max);
4245 if (p->protocol_max == 0)
4246 p->protocol_max = p->protocol_min;
4248 if (PRO_VERSION_MAX < p->protocol_min ||
4249 PRO_VERSION_MIN > p->protocol_max)
4252 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4254 conn_info(tconn, "Handshake successful: "
4255 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4260 conn_err(tconn, "incompatible DRBD dialects: "
4261 "I support %d-%d, peer supports %d-%d\n",
4262 PRO_VERSION_MIN, PRO_VERSION_MAX,
4263 p->protocol_min, p->protocol_max);
4267 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4268 static int drbd_do_auth(struct drbd_tconn *tconn)
4270 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4271 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4275 #define CHALLENGE_LEN 64
4279 0 - failed, try again (network error),
4280 -1 - auth failed, don't try again.
4283 static int drbd_do_auth(struct drbd_tconn *tconn)
4285 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4286 struct scatterlist sg;
4287 char *response = NULL;
4288 char *right_response = NULL;
4289 char *peers_ch = NULL;
4290 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
4291 unsigned int resp_size;
4292 struct hash_desc desc;
4293 struct packet_info pi;
4296 desc.tfm = tconn->cram_hmac_tfm;
4299 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4300 (u8 *)tconn->net_conf->shared_secret, key_len);
4302 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4307 get_random_bytes(my_challenge, CHALLENGE_LEN);
4309 rv = !conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4313 err = drbd_recv_header(tconn, &pi);
4319 if (pi.cmd != P_AUTH_CHALLENGE) {
4320 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4321 cmdname(pi.cmd), pi.cmd);
4326 if (pi.size > CHALLENGE_LEN * 2) {
4327 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4332 peers_ch = kmalloc(pi.size, GFP_NOIO);
4333 if (peers_ch == NULL) {
4334 conn_err(tconn, "kmalloc of peers_ch failed\n");
4339 err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4345 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4346 response = kmalloc(resp_size, GFP_NOIO);
4347 if (response == NULL) {
4348 conn_err(tconn, "kmalloc of response failed\n");
4353 sg_init_table(&sg, 1);
4354 sg_set_buf(&sg, peers_ch, pi.size);
4356 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4358 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4363 rv = !conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
4367 err = drbd_recv_header(tconn, &pi);
4373 if (pi.cmd != P_AUTH_RESPONSE) {
4374 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4375 cmdname(pi.cmd), pi.cmd);
4380 if (pi.size != resp_size) {
4381 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4386 err = drbd_recv_all_warn(tconn, response , resp_size);
4392 right_response = kmalloc(resp_size, GFP_NOIO);
4393 if (right_response == NULL) {
4394 conn_err(tconn, "kmalloc of right_response failed\n");
4399 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4401 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4403 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4408 rv = !memcmp(response, right_response, resp_size);
4411 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4412 resp_size, tconn->net_conf->cram_hmac_alg);
4419 kfree(right_response);
4425 int drbdd_init(struct drbd_thread *thi)
4427 struct drbd_tconn *tconn = thi->tconn;
4430 conn_info(tconn, "receiver (re)started\n");
4433 h = drbd_connect(tconn);
4435 drbd_disconnect(tconn);
4436 schedule_timeout_interruptible(HZ);
4439 conn_warn(tconn, "Discarding network configuration.\n");
4440 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4445 if (get_net_conf(tconn)) {
4447 put_net_conf(tconn);
4451 drbd_disconnect(tconn);
4453 conn_info(tconn, "receiver terminated\n");
4457 /* ********* acknowledge sender ******** */
4459 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4461 struct p_req_state_reply *p = tconn->meta.rbuf;
4462 int retcode = be32_to_cpu(p->retcode);
4464 if (retcode >= SS_SUCCESS) {
4465 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4467 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4468 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4469 drbd_set_st_err_str(retcode), retcode);
4471 wake_up(&tconn->ping_wait);
4476 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4478 struct drbd_conf *mdev;
4479 struct p_req_state_reply *p = tconn->meta.rbuf;
4480 int retcode = be32_to_cpu(p->retcode);
4482 mdev = vnr_to_mdev(tconn, pi->vnr);
4486 if (retcode >= SS_SUCCESS) {
4487 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4489 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4490 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4491 drbd_set_st_err_str(retcode), retcode);
4493 wake_up(&mdev->state_wait);
4498 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4500 return drbd_send_ping_ack(tconn);
4504 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4506 /* restore idle timeout */
4507 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4508 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4509 wake_up(&tconn->ping_wait);
4514 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4516 struct drbd_conf *mdev;
4517 struct p_block_ack *p = tconn->meta.rbuf;
4518 sector_t sector = be64_to_cpu(p->sector);
4519 int blksize = be32_to_cpu(p->blksize);
4521 mdev = vnr_to_mdev(tconn, pi->vnr);
4525 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4527 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4529 if (get_ldev(mdev)) {
4530 drbd_rs_complete_io(mdev, sector);
4531 drbd_set_in_sync(mdev, sector, blksize);
4532 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4533 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4536 dec_rs_pending(mdev);
4537 atomic_add(blksize >> 9, &mdev->rs_sect_in);
4543 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4544 struct rb_root *root, const char *func,
4545 enum drbd_req_event what, bool missing_ok)
4547 struct drbd_request *req;
4548 struct bio_and_error m;
4550 spin_lock_irq(&mdev->tconn->req_lock);
4551 req = find_request(mdev, root, id, sector, missing_ok, func);
4552 if (unlikely(!req)) {
4553 spin_unlock_irq(&mdev->tconn->req_lock);
4556 __req_mod(req, what, &m);
4557 spin_unlock_irq(&mdev->tconn->req_lock);
4560 complete_master_bio(mdev, &m);
4564 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4566 struct drbd_conf *mdev;
4567 struct p_block_ack *p = tconn->meta.rbuf;
4568 sector_t sector = be64_to_cpu(p->sector);
4569 int blksize = be32_to_cpu(p->blksize);
4570 enum drbd_req_event what;
4572 mdev = vnr_to_mdev(tconn, pi->vnr);
4576 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4578 if (p->block_id == ID_SYNCER) {
4579 drbd_set_in_sync(mdev, sector, blksize);
4580 dec_rs_pending(mdev);
4584 case P_RS_WRITE_ACK:
4585 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4586 what = WRITE_ACKED_BY_PEER_AND_SIS;
4589 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4590 what = WRITE_ACKED_BY_PEER;
4593 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
4594 what = RECV_ACKED_BY_PEER;
4596 case P_DISCARD_WRITE:
4597 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4598 what = DISCARD_WRITE;
4601 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4602 what = POSTPONE_WRITE;
4609 return validate_req_change_req_state(mdev, p->block_id, sector,
4610 &mdev->write_requests, __func__,
4614 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4616 struct drbd_conf *mdev;
4617 struct p_block_ack *p = tconn->meta.rbuf;
4618 sector_t sector = be64_to_cpu(p->sector);
4619 int size = be32_to_cpu(p->blksize);
4620 bool missing_ok = tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4621 tconn->net_conf->wire_protocol == DRBD_PROT_B;
4624 mdev = vnr_to_mdev(tconn, pi->vnr);
4628 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4630 if (p->block_id == ID_SYNCER) {
4631 dec_rs_pending(mdev);
4632 drbd_rs_failed_io(mdev, sector, size);
4636 found = validate_req_change_req_state(mdev, p->block_id, sector,
4637 &mdev->write_requests, __func__,
4638 NEG_ACKED, missing_ok);
4640 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4641 The master bio might already be completed, therefore the
4642 request is no longer in the collision hash. */
4643 /* In Protocol B we might already have got a P_RECV_ACK
4644 but then get a P_NEG_ACK afterwards. */
4647 drbd_set_out_of_sync(mdev, sector, size);
4652 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4654 struct drbd_conf *mdev;
4655 struct p_block_ack *p = tconn->meta.rbuf;
4656 sector_t sector = be64_to_cpu(p->sector);
4658 mdev = vnr_to_mdev(tconn, pi->vnr);
4662 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4664 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4665 (unsigned long long)sector, be32_to_cpu(p->blksize));
4667 return validate_req_change_req_state(mdev, p->block_id, sector,
4668 &mdev->read_requests, __func__,
4672 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4674 struct drbd_conf *mdev;
4677 struct p_block_ack *p = tconn->meta.rbuf;
4679 mdev = vnr_to_mdev(tconn, pi->vnr);
4683 sector = be64_to_cpu(p->sector);
4684 size = be32_to_cpu(p->blksize);
4686 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4688 dec_rs_pending(mdev);
4690 if (get_ldev_if_state(mdev, D_FAILED)) {
4691 drbd_rs_complete_io(mdev, sector);
4693 case P_NEG_RS_DREPLY:
4694 drbd_rs_failed_io(mdev, sector, size);
4708 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4710 struct drbd_conf *mdev;
4711 struct p_barrier_ack *p = tconn->meta.rbuf;
4713 mdev = vnr_to_mdev(tconn, pi->vnr);
4717 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4719 if (mdev->state.conn == C_AHEAD &&
4720 atomic_read(&mdev->ap_in_flight) == 0 &&
4721 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4722 mdev->start_resync_timer.expires = jiffies + HZ;
4723 add_timer(&mdev->start_resync_timer);
4729 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4731 struct drbd_conf *mdev;
4732 struct p_block_ack *p = tconn->meta.rbuf;
4733 struct drbd_work *w;
4737 mdev = vnr_to_mdev(tconn, pi->vnr);
4741 sector = be64_to_cpu(p->sector);
4742 size = be32_to_cpu(p->blksize);
4744 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4746 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4747 drbd_ov_out_of_sync_found(mdev, sector, size);
4749 ov_out_of_sync_print(mdev);
4751 if (!get_ldev(mdev))
4754 drbd_rs_complete_io(mdev, sector);
4755 dec_rs_pending(mdev);
4759 /* let's advance progress step marks only for every other megabyte */
4760 if ((mdev->ov_left & 0x200) == 0x200)
4761 drbd_advance_rs_marks(mdev, mdev->ov_left);
4763 if (mdev->ov_left == 0) {
4764 w = kmalloc(sizeof(*w), GFP_NOIO);
4766 w->cb = w_ov_finished;
4768 drbd_queue_work_front(&mdev->tconn->data.work, w);
4770 dev_err(DEV, "kmalloc(w) failed.");
4771 ov_out_of_sync_print(mdev);
4772 drbd_resync_finished(mdev);
4779 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4784 static int tconn_process_done_ee(struct drbd_tconn *tconn)
4786 struct drbd_conf *mdev;
4787 int i, not_empty = 0;
4790 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4791 flush_signals(current);
4792 idr_for_each_entry(&tconn->volumes, mdev, i) {
4793 if (drbd_process_done_ee(mdev))
4794 return 1; /* error */
4796 set_bit(SIGNAL_ASENDER, &tconn->flags);
4798 spin_lock_irq(&tconn->req_lock);
4799 idr_for_each_entry(&tconn->volumes, mdev, i) {
4800 not_empty = !list_empty(&mdev->done_ee);
4804 spin_unlock_irq(&tconn->req_lock);
4805 } while (not_empty);
4810 struct asender_cmd {
4812 int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
4815 static struct asender_cmd asender_tbl[] = {
4816 [P_PING] = { sizeof(struct p_header), got_Ping },
4817 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
4818 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4819 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4820 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4821 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
4822 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4823 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4824 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
4825 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4826 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4827 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4828 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
4829 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
4830 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
4831 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4832 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
4835 int drbd_asender(struct drbd_thread *thi)
4837 struct drbd_tconn *tconn = thi->tconn;
4838 struct p_header *h = tconn->meta.rbuf;
4839 struct asender_cmd *cmd = NULL;
4840 struct packet_info pi;
4844 int expect = sizeof(struct p_header);
4845 int ping_timeout_active = 0;
4847 current->policy = SCHED_RR; /* Make this a realtime task! */
4848 current->rt_priority = 2; /* more important than all other tasks */
4850 while (get_t_state(thi) == RUNNING) {
4851 drbd_thread_current_set_cpu(thi);
4852 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
4853 if (!drbd_send_ping(tconn)) {
4854 conn_err(tconn, "drbd_send_ping has failed\n");
4857 tconn->meta.socket->sk->sk_rcvtimeo =
4858 tconn->net_conf->ping_timeo*HZ/10;
4859 ping_timeout_active = 1;
4862 /* TODO: conditionally cork; it may hurt latency if we cork without
4864 if (!tconn->net_conf->no_cork)
4865 drbd_tcp_cork(tconn->meta.socket);
4866 if (tconn_process_done_ee(tconn)) {
4867 conn_err(tconn, "tconn_process_done_ee() failed\n");
4870 /* but unconditionally uncork unless disabled */
4871 if (!tconn->net_conf->no_cork)
4872 drbd_tcp_uncork(tconn->meta.socket);
4874 /* short circuit, recv_msg would return EINTR anyways. */
4875 if (signal_pending(current))
4878 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4879 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4881 flush_signals(current);
4884 * -EINTR (on meta) we got a signal
4885 * -EAGAIN (on meta) rcvtimeo expired
4886 * -ECONNRESET other side closed the connection
4887 * -ERESTARTSYS (on data) we got a signal
4888 * rv < 0 other than above: unexpected error!
4889 * rv == expected: full header or command
4890 * rv < expected: "woken" by signal during receive
4891 * rv == 0 : "connection shut down by peer"
4893 if (likely(rv > 0)) {
4896 } else if (rv == 0) {
4897 conn_err(tconn, "meta connection shut down by peer.\n");
4899 } else if (rv == -EAGAIN) {
4900 /* If the data socket received something meanwhile,
4901 * that is good enough: peer is still alive. */
4902 if (time_after(tconn->last_received,
4903 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
4905 if (ping_timeout_active) {
4906 conn_err(tconn, "PingAck did not arrive in time.\n");
4909 set_bit(SEND_PING, &tconn->flags);
4911 } else if (rv == -EINTR) {
4914 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
4918 if (received == expect && cmd == NULL) {
4919 if (decode_header(tconn, h, &pi))
4921 cmd = &asender_tbl[pi.cmd];
4922 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
4923 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
4927 expect = cmd->pkt_size;
4928 if (pi.size != expect - sizeof(struct p_header)) {
4929 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
4934 if (received == expect) {
4937 rv = cmd->fn(tconn, &pi);
4939 conn_err(tconn, "%pf failed\n", cmd->fn);
4943 tconn->last_received = jiffies;
4945 /* the idle_timeout (ping-int)
4946 * has been restored in got_PingAck() */
4947 if (cmd == &asender_tbl[P_PING_ACK])
4948 ping_timeout_active = 0;
4952 expect = sizeof(struct p_header);
4959 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4963 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4965 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4967 conn_info(tconn, "asender terminated\n");