4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <asm/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
68 static int drbd_do_handshake(struct drbd_tconn *tconn);
69 static int drbd_do_auth(struct drbd_tconn *tconn);
70 static int drbd_disconnected(int vnr, void *p, void *data);
72 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
73 static int e_end_block(struct drbd_work *, int);
76 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
79 * some helper functions to deal with single linked page lists,
80 * page->private being our "next" pointer.
83 /* If at least n pages are linked at head, get n pages off.
84 * Otherwise, don't modify head, and return NULL.
85 * Locking is the responsibility of the caller.
87 static struct page *page_chain_del(struct page **head, int n)
101 tmp = page_chain_next(page);
103 break; /* found sufficient pages */
105 /* insufficient pages, don't use any of them. */
110 /* add end of list marker for the returned list */
111 set_page_private(page, 0);
112 /* actual return value, and adjustment of head */
118 /* may be used outside of locks to find the tail of a (usually short)
119 * "private" page chain, before adding it back to a global chain head
120 * with page_chain_add() under a spinlock. */
121 static struct page *page_chain_tail(struct page *page, int *len)
125 while ((tmp = page_chain_next(page)))
132 static int page_chain_free(struct page *page)
136 page_chain_for_each_safe(page, tmp) {
143 static void page_chain_add(struct page **head,
144 struct page *chain_first, struct page *chain_last)
148 tmp = page_chain_tail(chain_first, NULL);
149 BUG_ON(tmp != chain_last);
152 /* add chain to head */
153 set_page_private(chain_last, (unsigned long)*head);
157 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
159 struct page *page = NULL;
160 struct page *tmp = NULL;
163 /* Yes, testing drbd_pp_vacant outside the lock is racy.
164 * So what. It saves a spin_lock. */
165 if (drbd_pp_vacant >= number) {
166 spin_lock(&drbd_pp_lock);
167 page = page_chain_del(&drbd_pp_pool, number);
169 drbd_pp_vacant -= number;
170 spin_unlock(&drbd_pp_lock);
175 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
176 * "criss-cross" setup, that might cause write-out on some other DRBD,
177 * which in turn might block on the other node at this very place. */
178 for (i = 0; i < number; i++) {
179 tmp = alloc_page(GFP_TRY);
182 set_page_private(tmp, (unsigned long)page);
189 /* Not enough pages immediately available this time.
190 * No need to jump around here, drbd_pp_alloc will retry this
191 * function "soon". */
193 tmp = page_chain_tail(page, NULL);
194 spin_lock(&drbd_pp_lock);
195 page_chain_add(&drbd_pp_pool, page, tmp);
197 spin_unlock(&drbd_pp_lock);
202 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
204 struct drbd_peer_request *peer_req;
205 struct list_head *le, *tle;
207 /* The EEs are always appended to the end of the list. Since
208 they are sent in order over the wire, they have to finish
209 in order. As soon as we see the first not finished we can
210 stop to examine the list... */
212 list_for_each_safe(le, tle, &mdev->net_ee) {
213 peer_req = list_entry(le, struct drbd_peer_request, w.list);
214 if (drbd_ee_has_active_page(peer_req))
216 list_move(le, to_be_freed);
220 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
222 LIST_HEAD(reclaimed);
223 struct drbd_peer_request *peer_req, *t;
225 spin_lock_irq(&mdev->tconn->req_lock);
226 reclaim_net_ee(mdev, &reclaimed);
227 spin_unlock_irq(&mdev->tconn->req_lock);
229 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
230 drbd_free_net_ee(mdev, peer_req);
234 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
235 * @mdev: DRBD device.
236 * @number: number of pages requested
237 * @retry: whether to retry, if not enough pages are available right now
239 * Tries to allocate number pages, first from our own page pool, then from
240 * the kernel, unless this allocation would exceed the max_buffers setting.
241 * Possibly retry until DRBD frees sufficient pages somewhere else.
243 * Returns a page chain linked via page->private.
245 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
247 struct page *page = NULL;
250 /* Yes, we may run up to @number over max_buffers. If we
251 * follow it strictly, the admin will get it wrong anyways. */
252 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
253 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
255 while (page == NULL) {
256 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
258 drbd_kick_lo_and_reclaim_net(mdev);
260 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
261 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
269 if (signal_pending(current)) {
270 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
276 finish_wait(&drbd_pp_wait, &wait);
279 atomic_add(number, &mdev->pp_in_use);
283 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
284 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
285 * Either links the page chain back to the global pool,
286 * or returns all pages to the system. */
287 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
289 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
292 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
293 i = page_chain_free(page);
296 tmp = page_chain_tail(page, &i);
297 spin_lock(&drbd_pp_lock);
298 page_chain_add(&drbd_pp_pool, page, tmp);
300 spin_unlock(&drbd_pp_lock);
302 i = atomic_sub_return(i, a);
304 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
305 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
306 wake_up(&drbd_pp_wait);
310 You need to hold the req_lock:
311 _drbd_wait_ee_list_empty()
313 You must not have the req_lock:
319 drbd_process_done_ee()
321 drbd_wait_ee_list_empty()
324 struct drbd_peer_request *
325 drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
326 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
328 struct drbd_peer_request *peer_req;
330 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
332 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
335 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
337 if (!(gfp_mask & __GFP_NOWARN))
338 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
342 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
346 drbd_clear_interval(&peer_req->i);
347 peer_req->i.size = data_size;
348 peer_req->i.sector = sector;
349 peer_req->i.local = false;
350 peer_req->i.waiting = false;
352 peer_req->epoch = NULL;
353 peer_req->w.mdev = mdev;
354 peer_req->pages = page;
355 atomic_set(&peer_req->pending_bios, 0);
358 * The block_id is opaque to the receiver. It is not endianness
359 * converted, and sent back to the sender unchanged.
361 peer_req->block_id = id;
366 mempool_free(peer_req, drbd_ee_mempool);
370 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
373 if (peer_req->flags & EE_HAS_DIGEST)
374 kfree(peer_req->digest);
375 drbd_pp_free(mdev, peer_req->pages, is_net);
376 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
377 D_ASSERT(drbd_interval_empty(&peer_req->i));
378 mempool_free(peer_req, drbd_ee_mempool);
381 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
383 LIST_HEAD(work_list);
384 struct drbd_peer_request *peer_req, *t;
386 int is_net = list == &mdev->net_ee;
388 spin_lock_irq(&mdev->tconn->req_lock);
389 list_splice_init(list, &work_list);
390 spin_unlock_irq(&mdev->tconn->req_lock);
392 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
393 drbd_free_some_ee(mdev, peer_req, is_net);
400 /* See also comments in _req_mod(,BARRIER_ACKED)
401 * and receive_Barrier.
403 * Move entries from net_ee to done_ee, if ready.
404 * Grab done_ee, call all callbacks, free the entries.
405 * The callbacks typically send out ACKs.
407 static int drbd_process_done_ee(struct drbd_conf *mdev)
409 LIST_HEAD(work_list);
410 LIST_HEAD(reclaimed);
411 struct drbd_peer_request *peer_req, *t;
414 spin_lock_irq(&mdev->tconn->req_lock);
415 reclaim_net_ee(mdev, &reclaimed);
416 list_splice_init(&mdev->done_ee, &work_list);
417 spin_unlock_irq(&mdev->tconn->req_lock);
419 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420 drbd_free_net_ee(mdev, peer_req);
422 /* possible callbacks here:
423 * e_end_block, and e_end_resync_block, e_send_discard_write.
424 * all ignore the last argument.
426 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
429 /* list_del not necessary, next/prev members not touched */
430 err2 = peer_req->w.cb(&peer_req->w, !!err);
433 drbd_free_ee(mdev, peer_req);
435 wake_up(&mdev->ee_wait);
440 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
444 /* avoids spin_lock/unlock
445 * and calling prepare_to_wait in the fast path */
446 while (!list_empty(head)) {
447 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
448 spin_unlock_irq(&mdev->tconn->req_lock);
450 finish_wait(&mdev->ee_wait, &wait);
451 spin_lock_irq(&mdev->tconn->req_lock);
455 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
457 spin_lock_irq(&mdev->tconn->req_lock);
458 _drbd_wait_ee_list_empty(mdev, head);
459 spin_unlock_irq(&mdev->tconn->req_lock);
462 /* see also kernel_accept; which is only present since 2.6.18.
463 * also we want to log which part of it failed, exactly */
464 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
466 struct sock *sk = sock->sk;
470 err = sock->ops->listen(sock, 5);
474 *what = "sock_create_lite";
475 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
481 err = sock->ops->accept(sock, *newsock, 0);
483 sock_release(*newsock);
487 (*newsock)->ops = sock->ops;
493 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
500 struct msghdr msg = {
502 .msg_iov = (struct iovec *)&iov,
503 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
509 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
515 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
522 struct msghdr msg = {
524 .msg_iov = (struct iovec *)&iov,
525 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
533 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
538 * ECONNRESET other side closed the connection
539 * ERESTARTSYS (on sock) we got a signal
543 if (rv == -ECONNRESET)
544 conn_info(tconn, "sock was reset by peer\n");
545 else if (rv != -ERESTARTSYS)
546 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
548 } else if (rv == 0) {
549 conn_info(tconn, "sock was shut down by peer\n");
552 /* signal came in, or peer/link went down,
553 * after we read a partial message
555 /* D_ASSERT(signal_pending(current)); */
563 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
568 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
572 err = drbd_recv(tconn, buf, size);
581 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
585 err = drbd_recv_all(tconn, buf, size);
586 if (err && !signal_pending(current))
587 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
592 * On individual connections, the socket buffer size must be set prior to the
593 * listen(2) or connect(2) calls in order to have it take effect.
594 * This is our wrapper to do so.
596 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599 /* open coded SO_SNDBUF, SO_RCVBUF */
601 sock->sk->sk_sndbuf = snd;
602 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605 sock->sk->sk_rcvbuf = rcv;
606 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
610 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
614 struct sockaddr_in6 src_in6;
616 int disconnect_on_error = 1;
618 if (!get_net_conf(tconn))
621 what = "sock_create_kern";
622 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
623 SOCK_STREAM, IPPROTO_TCP, &sock);
629 sock->sk->sk_rcvtimeo =
630 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
631 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
632 tconn->net_conf->rcvbuf_size);
634 /* explicitly bind to the configured IP as source IP
635 * for the outgoing connections.
636 * This is needed for multihomed hosts and to be
637 * able to use lo: interfaces for drbd.
638 * Make sure to use 0 as port number, so linux selects
639 * a free one dynamically.
641 memcpy(&src_in6, tconn->net_conf->my_addr,
642 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
643 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
644 src_in6.sin6_port = 0;
646 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
648 what = "bind before connect";
649 err = sock->ops->bind(sock,
650 (struct sockaddr *) &src_in6,
651 tconn->net_conf->my_addr_len);
655 /* connect may fail, peer not yet available.
656 * stay C_WF_CONNECTION, don't go Disconnecting! */
657 disconnect_on_error = 0;
659 err = sock->ops->connect(sock,
660 (struct sockaddr *)tconn->net_conf->peer_addr,
661 tconn->net_conf->peer_addr_len, 0);
670 /* timeout, busy, signal pending */
671 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
672 case EINTR: case ERESTARTSYS:
673 /* peer not (yet) available, network problem */
674 case ECONNREFUSED: case ENETUNREACH:
675 case EHOSTDOWN: case EHOSTUNREACH:
676 disconnect_on_error = 0;
679 conn_err(tconn, "%s failed, err = %d\n", what, err);
681 if (disconnect_on_error)
682 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
688 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
691 struct socket *s_estab = NULL, *s_listen;
694 if (!get_net_conf(tconn))
697 what = "sock_create_kern";
698 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
699 SOCK_STREAM, IPPROTO_TCP, &s_listen);
705 timeo = tconn->net_conf->try_connect_int * HZ;
706 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
708 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
709 s_listen->sk->sk_rcvtimeo = timeo;
710 s_listen->sk->sk_sndtimeo = timeo;
711 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
712 tconn->net_conf->rcvbuf_size);
714 what = "bind before listen";
715 err = s_listen->ops->bind(s_listen,
716 (struct sockaddr *) tconn->net_conf->my_addr,
717 tconn->net_conf->my_addr_len);
721 err = drbd_accept(&what, s_listen, &s_estab);
725 sock_release(s_listen);
727 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
728 conn_err(tconn, "%s failed, err = %d\n", what, err);
729 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
737 static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
739 struct p_header *h = tconn->data.sbuf;
741 return !_conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
744 static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
746 struct p_header80 *h = tconn->data.rbuf;
749 rr = drbd_recv_short(sock, h, sizeof(*h), 0);
751 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
752 return be16_to_cpu(h->command);
758 * drbd_socket_okay() - Free the socket if its connection is not okay
759 * @sock: pointer to the pointer to the socket.
761 static int drbd_socket_okay(struct socket **sock)
769 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
771 if (rr > 0 || rr == -EAGAIN) {
779 /* Gets called if a connection is established, or if a new minor gets created
781 int drbd_connected(int vnr, void *p, void *data)
783 struct drbd_conf *mdev = (struct drbd_conf *)p;
786 atomic_set(&mdev->packet_seq, 0);
789 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
790 &mdev->tconn->cstate_mutex :
791 &mdev->own_state_mutex;
793 err = drbd_send_sync_param(mdev);
795 err = drbd_send_sizes(mdev, 0, 0);
797 err = drbd_send_uuids(mdev);
799 err = drbd_send_state(mdev);
800 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
801 clear_bit(RESIZE_PENDING, &mdev->flags);
807 * 1 yes, we have a valid connection
808 * 0 oops, did not work out, please try again
809 * -1 peer talks different language,
810 * no point in trying again, please go standalone.
811 * -2 We do not have a network config...
813 static int drbd_connect(struct drbd_tconn *tconn)
815 struct socket *s, *sock, *msock;
818 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
821 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
823 /* Assume that the peer only understands protocol 80 until we know better. */
824 tconn->agreed_pro_version = 80;
831 /* 3 tries, this should take less than a second! */
832 s = drbd_try_connect(tconn);
835 /* give the other side time to call bind() & listen() */
836 schedule_timeout_interruptible(HZ / 10);
841 drbd_send_fp(tconn, s, P_HAND_SHAKE_S);
845 drbd_send_fp(tconn, s, P_HAND_SHAKE_M);
849 conn_err(tconn, "Logic error in drbd_connect()\n");
850 goto out_release_sockets;
855 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
856 ok = drbd_socket_okay(&sock);
857 ok = drbd_socket_okay(&msock) && ok;
863 s = drbd_wait_for_connect(tconn);
865 try = drbd_recv_fp(tconn, s);
866 drbd_socket_okay(&sock);
867 drbd_socket_okay(&msock);
871 conn_warn(tconn, "initial packet S crossed\n");
878 conn_warn(tconn, "initial packet M crossed\n");
882 set_bit(DISCARD_CONCURRENT, &tconn->flags);
885 conn_warn(tconn, "Error receiving initial packet\n");
892 if (tconn->cstate <= C_DISCONNECTING)
893 goto out_release_sockets;
894 if (signal_pending(current)) {
895 flush_signals(current);
897 if (get_t_state(&tconn->receiver) == EXITING)
898 goto out_release_sockets;
902 ok = drbd_socket_okay(&sock);
903 ok = drbd_socket_okay(&msock) && ok;
909 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
910 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
912 sock->sk->sk_allocation = GFP_NOIO;
913 msock->sk->sk_allocation = GFP_NOIO;
915 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
916 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
919 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
920 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
921 * first set it to the P_HAND_SHAKE timeout,
922 * which we set to 4x the configured ping_timeout. */
923 sock->sk->sk_sndtimeo =
924 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
926 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
927 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
929 /* we don't want delays.
930 * we use TCP_CORK where appropriate, though */
931 drbd_tcp_nodelay(sock);
932 drbd_tcp_nodelay(msock);
934 tconn->data.socket = sock;
935 tconn->meta.socket = msock;
936 tconn->last_received = jiffies;
938 h = drbd_do_handshake(tconn);
942 if (tconn->cram_hmac_tfm) {
943 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
944 switch (drbd_do_auth(tconn)) {
946 conn_err(tconn, "Authentication of peer failed\n");
949 conn_err(tconn, "Authentication of peer failed, trying again.\n");
954 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
957 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
958 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
960 drbd_thread_start(&tconn->asender);
962 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
965 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
975 static int decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
977 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
978 pi->cmd = be16_to_cpu(h->h80.command);
979 pi->size = be16_to_cpu(h->h80.length);
981 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
982 pi->cmd = be16_to_cpu(h->h95.command);
983 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
986 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
987 be32_to_cpu(h->h80.magic),
988 be16_to_cpu(h->h80.command),
989 be16_to_cpu(h->h80.length));
995 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
997 struct p_header *h = tconn->data.rbuf;
1000 err = drbd_recv_all_warn(tconn, h, sizeof(*h));
1004 err = decode_header(tconn, h, pi);
1005 tconn->last_received = jiffies;
1010 static void drbd_flush(struct drbd_conf *mdev)
1014 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1015 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1018 dev_err(DEV, "local disk flush failed with status %d\n", rv);
1019 /* would rather check on EOPNOTSUPP, but that is not reliable.
1020 * don't try again for ANY return value != 0
1021 * if (rv == -EOPNOTSUPP) */
1022 drbd_bump_write_ordering(mdev, WO_drain_io);
1029 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1030 * @mdev: DRBD device.
1031 * @epoch: Epoch object.
1034 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1035 struct drbd_epoch *epoch,
1036 enum epoch_event ev)
1039 struct drbd_epoch *next_epoch;
1040 enum finish_epoch rv = FE_STILL_LIVE;
1042 spin_lock(&mdev->epoch_lock);
1046 epoch_size = atomic_read(&epoch->epoch_size);
1048 switch (ev & ~EV_CLEANUP) {
1050 atomic_dec(&epoch->active);
1052 case EV_GOT_BARRIER_NR:
1053 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1055 case EV_BECAME_LAST:
1060 if (epoch_size != 0 &&
1061 atomic_read(&epoch->active) == 0 &&
1062 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1063 if (!(ev & EV_CLEANUP)) {
1064 spin_unlock(&mdev->epoch_lock);
1065 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1066 spin_lock(&mdev->epoch_lock);
1070 if (mdev->current_epoch != epoch) {
1071 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1072 list_del(&epoch->list);
1073 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1077 if (rv == FE_STILL_LIVE)
1081 atomic_set(&epoch->epoch_size, 0);
1082 /* atomic_set(&epoch->active, 0); is already zero */
1083 if (rv == FE_STILL_LIVE)
1085 wake_up(&mdev->ee_wait);
1095 spin_unlock(&mdev->epoch_lock);
1101 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1102 * @mdev: DRBD device.
1103 * @wo: Write ordering method to try.
1105 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1107 enum write_ordering_e pwo;
1108 static char *write_ordering_str[] = {
1110 [WO_drain_io] = "drain",
1111 [WO_bdev_flush] = "flush",
1114 pwo = mdev->write_ordering;
1116 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1118 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1120 mdev->write_ordering = wo;
1121 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1122 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1126 * drbd_submit_peer_request()
1127 * @mdev: DRBD device.
1128 * @peer_req: peer request
1129 * @rw: flag field, see bio->bi_rw
1131 * May spread the pages to multiple bios,
1132 * depending on bio_add_page restrictions.
1134 * Returns 0 if all bios have been submitted,
1135 * -ENOMEM if we could not allocate enough bios,
1136 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1137 * single page to an empty bio (which should never happen and likely indicates
1138 * that the lower level IO stack is in some way broken). This has been observed
1139 * on certain Xen deployments.
1141 /* TODO allocate from our own bio_set. */
1142 int drbd_submit_peer_request(struct drbd_conf *mdev,
1143 struct drbd_peer_request *peer_req,
1144 const unsigned rw, const int fault_type)
1146 struct bio *bios = NULL;
1148 struct page *page = peer_req->pages;
1149 sector_t sector = peer_req->i.sector;
1150 unsigned ds = peer_req->i.size;
1151 unsigned n_bios = 0;
1152 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1155 /* In most cases, we will only need one bio. But in case the lower
1156 * level restrictions happen to be different at this offset on this
1157 * side than those of the sending peer, we may need to submit the
1158 * request in more than one bio.
1160 * Plain bio_alloc is good enough here, this is no DRBD internally
1161 * generated bio, but a bio allocated on behalf of the peer.
1164 bio = bio_alloc(GFP_NOIO, nr_pages);
1166 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1169 /* > peer_req->i.sector, unless this is the first bio */
1170 bio->bi_sector = sector;
1171 bio->bi_bdev = mdev->ldev->backing_bdev;
1173 bio->bi_private = peer_req;
1174 bio->bi_end_io = drbd_peer_request_endio;
1176 bio->bi_next = bios;
1180 page_chain_for_each(page) {
1181 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1182 if (!bio_add_page(bio, page, len, 0)) {
1183 /* A single page must always be possible!
1184 * But in case it fails anyways,
1185 * we deal with it, and complain (below). */
1186 if (bio->bi_vcnt == 0) {
1188 "bio_add_page failed for len=%u, "
1189 "bi_vcnt=0 (bi_sector=%llu)\n",
1190 len, (unsigned long long)bio->bi_sector);
1200 D_ASSERT(page == NULL);
1203 atomic_set(&peer_req->pending_bios, n_bios);
1206 bios = bios->bi_next;
1207 bio->bi_next = NULL;
1209 drbd_generic_make_request(mdev, fault_type, bio);
1216 bios = bios->bi_next;
1222 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1223 struct drbd_peer_request *peer_req)
1225 struct drbd_interval *i = &peer_req->i;
1227 drbd_remove_interval(&mdev->write_requests, i);
1228 drbd_clear_interval(i);
1230 /* Wake up any processes waiting for this peer request to complete. */
1232 wake_up(&mdev->misc_wait);
1235 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1237 struct drbd_conf *mdev;
1239 struct p_barrier *p = tconn->data.rbuf;
1240 struct drbd_epoch *epoch;
1242 mdev = vnr_to_mdev(tconn, pi->vnr);
1248 mdev->current_epoch->barrier_nr = p->barrier;
1249 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1251 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1252 * the activity log, which means it would not be resynced in case the
1253 * R_PRIMARY crashes now.
1254 * Therefore we must send the barrier_ack after the barrier request was
1256 switch (mdev->write_ordering) {
1258 if (rv == FE_RECYCLED)
1261 /* receiver context, in the writeout path of the other node.
1262 * avoid potential distributed deadlock */
1263 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1267 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1272 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1275 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1276 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1281 epoch = mdev->current_epoch;
1282 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1284 D_ASSERT(atomic_read(&epoch->active) == 0);
1285 D_ASSERT(epoch->flags == 0);
1289 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1294 atomic_set(&epoch->epoch_size, 0);
1295 atomic_set(&epoch->active, 0);
1297 spin_lock(&mdev->epoch_lock);
1298 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1299 list_add(&epoch->list, &mdev->current_epoch->list);
1300 mdev->current_epoch = epoch;
1303 /* The current_epoch got recycled while we allocated this one... */
1306 spin_unlock(&mdev->epoch_lock);
1311 /* used from receive_RSDataReply (recv_resync_read)
1312 * and from receive_Data */
1313 static struct drbd_peer_request *
1314 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1315 int data_size) __must_hold(local)
1317 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1318 struct drbd_peer_request *peer_req;
1321 void *dig_in = mdev->tconn->int_dig_in;
1322 void *dig_vv = mdev->tconn->int_dig_vv;
1323 unsigned long *data;
1325 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1326 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1329 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1336 if (!expect(data_size != 0))
1338 if (!expect(IS_ALIGNED(data_size, 512)))
1340 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1343 /* even though we trust out peer,
1344 * we sometimes have to double check. */
1345 if (sector + (data_size>>9) > capacity) {
1346 dev_err(DEV, "request from peer beyond end of local disk: "
1347 "capacity: %llus < sector: %llus + size: %u\n",
1348 (unsigned long long)capacity,
1349 (unsigned long long)sector, data_size);
1353 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1354 * "criss-cross" setup, that might cause write-out on some other DRBD,
1355 * which in turn might block on the other node at this very place. */
1356 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1361 page = peer_req->pages;
1362 page_chain_for_each(page) {
1363 unsigned len = min_t(int, ds, PAGE_SIZE);
1365 err = drbd_recv_all_warn(mdev->tconn, data, len);
1366 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1367 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1368 data[0] = data[0] ^ (unsigned long)-1;
1372 drbd_free_ee(mdev, peer_req);
1379 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1380 if (memcmp(dig_in, dig_vv, dgs)) {
1381 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1382 (unsigned long long)sector, data_size);
1383 drbd_free_ee(mdev, peer_req);
1387 mdev->recv_cnt += data_size>>9;
1391 /* drbd_drain_block() just takes a data block
1392 * out of the socket input buffer, and discards it.
1394 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1403 page = drbd_pp_alloc(mdev, 1, 1);
1407 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1409 err = drbd_recv_all_warn(mdev->tconn, data, len);
1415 drbd_pp_free(mdev, page, 0);
1419 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1420 sector_t sector, int data_size)
1422 struct bio_vec *bvec;
1424 int dgs, err, i, expect;
1425 void *dig_in = mdev->tconn->int_dig_in;
1426 void *dig_vv = mdev->tconn->int_dig_vv;
1428 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1429 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1432 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1439 /* optimistically update recv_cnt. if receiving fails below,
1440 * we disconnect anyways, and counters will be reset. */
1441 mdev->recv_cnt += data_size>>9;
1443 bio = req->master_bio;
1444 D_ASSERT(sector == bio->bi_sector);
1446 bio_for_each_segment(bvec, bio, i) {
1447 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1448 expect = min_t(int, data_size, bvec->bv_len);
1449 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1450 kunmap(bvec->bv_page);
1453 data_size -= expect;
1457 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1458 if (memcmp(dig_in, dig_vv, dgs)) {
1459 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1464 D_ASSERT(data_size == 0);
1468 /* e_end_resync_block() is called via
1469 * drbd_process_done_ee() by asender only */
1470 static int e_end_resync_block(struct drbd_work *w, int unused)
1472 struct drbd_peer_request *peer_req =
1473 container_of(w, struct drbd_peer_request, w);
1474 struct drbd_conf *mdev = w->mdev;
1475 sector_t sector = peer_req->i.sector;
1478 D_ASSERT(drbd_interval_empty(&peer_req->i));
1480 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1481 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1482 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1484 /* Record failure to sync */
1485 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1487 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1494 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1496 struct drbd_peer_request *peer_req;
1498 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1502 dec_rs_pending(mdev);
1505 /* corresponding dec_unacked() in e_end_resync_block()
1506 * respective _drbd_clear_done_ee */
1508 peer_req->w.cb = e_end_resync_block;
1510 spin_lock_irq(&mdev->tconn->req_lock);
1511 list_add(&peer_req->w.list, &mdev->sync_ee);
1512 spin_unlock_irq(&mdev->tconn->req_lock);
1514 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1515 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1518 /* don't care for the reason here */
1519 dev_err(DEV, "submit failed, triggering re-connect\n");
1520 spin_lock_irq(&mdev->tconn->req_lock);
1521 list_del(&peer_req->w.list);
1522 spin_unlock_irq(&mdev->tconn->req_lock);
1524 drbd_free_ee(mdev, peer_req);
1530 static struct drbd_request *
1531 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1532 sector_t sector, bool missing_ok, const char *func)
1534 struct drbd_request *req;
1536 /* Request object according to our peer */
1537 req = (struct drbd_request *)(unsigned long)id;
1538 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1541 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1542 (unsigned long)id, (unsigned long long)sector);
1547 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1549 struct drbd_conf *mdev;
1550 struct drbd_request *req;
1553 struct p_data *p = tconn->data.rbuf;
1555 mdev = vnr_to_mdev(tconn, pi->vnr);
1559 sector = be64_to_cpu(p->sector);
1561 spin_lock_irq(&mdev->tconn->req_lock);
1562 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1563 spin_unlock_irq(&mdev->tconn->req_lock);
1567 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1568 * special casing it there for the various failure cases.
1569 * still no race with drbd_fail_pending_reads */
1570 err = recv_dless_read(mdev, req, sector, pi->size);
1572 req_mod(req, DATA_RECEIVED);
1573 /* else: nothing. handled from drbd_disconnect...
1574 * I don't think we may complete this just yet
1575 * in case we are "on-disconnect: freeze" */
1580 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1582 struct drbd_conf *mdev;
1585 struct p_data *p = tconn->data.rbuf;
1587 mdev = vnr_to_mdev(tconn, pi->vnr);
1591 sector = be64_to_cpu(p->sector);
1592 D_ASSERT(p->block_id == ID_SYNCER);
1594 if (get_ldev(mdev)) {
1595 /* data is submitted to disk within recv_resync_read.
1596 * corresponding put_ldev done below on error,
1597 * or in drbd_peer_request_endio. */
1598 err = recv_resync_read(mdev, sector, pi->size);
1600 if (__ratelimit(&drbd_ratelimit_state))
1601 dev_err(DEV, "Can not write resync data to local disk.\n");
1603 err = drbd_drain_block(mdev, pi->size);
1605 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1608 atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1613 static int w_restart_write(struct drbd_work *w, int cancel)
1615 struct drbd_request *req = container_of(w, struct drbd_request, w);
1616 struct drbd_conf *mdev = w->mdev;
1618 unsigned long start_time;
1619 unsigned long flags;
1621 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1622 if (!expect(req->rq_state & RQ_POSTPONED)) {
1623 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1626 bio = req->master_bio;
1627 start_time = req->start_time;
1628 /* Postponed requests will not have their master_bio completed! */
1629 __req_mod(req, DISCARD_WRITE, NULL);
1630 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1632 while (__drbd_make_request(mdev, bio, start_time))
1637 static void restart_conflicting_writes(struct drbd_conf *mdev,
1638 sector_t sector, int size)
1640 struct drbd_interval *i;
1641 struct drbd_request *req;
1643 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1646 req = container_of(i, struct drbd_request, i);
1647 if (req->rq_state & RQ_LOCAL_PENDING ||
1648 !(req->rq_state & RQ_POSTPONED))
1650 if (expect(list_empty(&req->w.list))) {
1652 req->w.cb = w_restart_write;
1653 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1658 /* e_end_block() is called via drbd_process_done_ee().
1659 * this means this function only runs in the asender thread
1661 static int e_end_block(struct drbd_work *w, int cancel)
1663 struct drbd_peer_request *peer_req =
1664 container_of(w, struct drbd_peer_request, w);
1665 struct drbd_conf *mdev = w->mdev;
1666 sector_t sector = peer_req->i.sector;
1669 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
1670 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1671 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1672 mdev->state.conn <= C_PAUSED_SYNC_T &&
1673 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1674 P_RS_WRITE_ACK : P_WRITE_ACK;
1675 err = drbd_send_ack(mdev, pcmd, peer_req);
1676 if (pcmd == P_RS_WRITE_ACK)
1677 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1679 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1680 /* we expect it to be marked out of sync anyways...
1681 * maybe assert this? */
1685 /* we delete from the conflict detection hash _after_ we sent out the
1686 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1687 if (mdev->tconn->net_conf->two_primaries) {
1688 spin_lock_irq(&mdev->tconn->req_lock);
1689 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1690 drbd_remove_epoch_entry_interval(mdev, peer_req);
1691 if (peer_req->flags & EE_RESTART_REQUESTS)
1692 restart_conflicting_writes(mdev, sector, peer_req->i.size);
1693 spin_unlock_irq(&mdev->tconn->req_lock);
1695 D_ASSERT(drbd_interval_empty(&peer_req->i));
1697 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1702 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1704 struct drbd_conf *mdev = w->mdev;
1705 struct drbd_peer_request *peer_req =
1706 container_of(w, struct drbd_peer_request, w);
1709 err = drbd_send_ack(mdev, ack, peer_req);
1715 static int e_send_discard_write(struct drbd_work *w, int unused)
1717 return e_send_ack(w, P_DISCARD_WRITE);
1720 static int e_send_retry_write(struct drbd_work *w, int unused)
1722 struct drbd_tconn *tconn = w->mdev->tconn;
1724 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1725 P_RETRY_WRITE : P_DISCARD_WRITE);
1728 static bool seq_greater(u32 a, u32 b)
1731 * We assume 32-bit wrap-around here.
1732 * For 24-bit wrap-around, we would have to shift:
1735 return (s32)a - (s32)b > 0;
1738 static u32 seq_max(u32 a, u32 b)
1740 return seq_greater(a, b) ? a : b;
1743 static bool need_peer_seq(struct drbd_conf *mdev)
1745 struct drbd_tconn *tconn = mdev->tconn;
1748 * We only need to keep track of the last packet_seq number of our peer
1749 * if we are in dual-primary mode and we have the discard flag set; see
1750 * handle_write_conflicts().
1752 return tconn->net_conf->two_primaries &&
1753 test_bit(DISCARD_CONCURRENT, &tconn->flags);
1756 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1758 unsigned int newest_peer_seq;
1760 if (need_peer_seq(mdev)) {
1761 spin_lock(&mdev->peer_seq_lock);
1762 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1763 mdev->peer_seq = newest_peer_seq;
1764 spin_unlock(&mdev->peer_seq_lock);
1765 /* wake up only if we actually changed mdev->peer_seq */
1766 if (peer_seq == newest_peer_seq)
1767 wake_up(&mdev->seq_wait);
1771 /* Called from receive_Data.
1772 * Synchronize packets on sock with packets on msock.
1774 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1775 * packet traveling on msock, they are still processed in the order they have
1778 * Note: we don't care for Ack packets overtaking P_DATA packets.
1780 * In case packet_seq is larger than mdev->peer_seq number, there are
1781 * outstanding packets on the msock. We wait for them to arrive.
1782 * In case we are the logically next packet, we update mdev->peer_seq
1783 * ourselves. Correctly handles 32bit wrap around.
1785 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1786 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1787 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1788 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1790 * returns 0 if we may process the packet,
1791 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1792 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1798 if (!need_peer_seq(mdev))
1801 spin_lock(&mdev->peer_seq_lock);
1803 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1804 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1808 if (signal_pending(current)) {
1812 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1813 spin_unlock(&mdev->peer_seq_lock);
1814 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1815 timeout = schedule_timeout(timeout);
1816 spin_lock(&mdev->peer_seq_lock);
1819 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1823 spin_unlock(&mdev->peer_seq_lock);
1824 finish_wait(&mdev->seq_wait, &wait);
1828 /* see also bio_flags_to_wire()
1829 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1830 * flags and back. We may replicate to other kernel versions. */
1831 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1833 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1834 (dpf & DP_FUA ? REQ_FUA : 0) |
1835 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1836 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1839 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1842 struct drbd_interval *i;
1845 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1846 struct drbd_request *req;
1847 struct bio_and_error m;
1851 req = container_of(i, struct drbd_request, i);
1852 if (!(req->rq_state & RQ_POSTPONED))
1854 req->rq_state &= ~RQ_POSTPONED;
1855 __req_mod(req, NEG_ACKED, &m);
1856 spin_unlock_irq(&mdev->tconn->req_lock);
1858 complete_master_bio(mdev, &m);
1859 spin_lock_irq(&mdev->tconn->req_lock);
1864 static int handle_write_conflicts(struct drbd_conf *mdev,
1865 struct drbd_peer_request *peer_req)
1867 struct drbd_tconn *tconn = mdev->tconn;
1868 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1869 sector_t sector = peer_req->i.sector;
1870 const unsigned int size = peer_req->i.size;
1871 struct drbd_interval *i;
1876 * Inserting the peer request into the write_requests tree will prevent
1877 * new conflicting local requests from being added.
1879 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1882 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1883 if (i == &peer_req->i)
1888 * Our peer has sent a conflicting remote request; this
1889 * should not happen in a two-node setup. Wait for the
1890 * earlier peer request to complete.
1892 err = drbd_wait_misc(mdev, i);
1898 equal = i->sector == sector && i->size == size;
1899 if (resolve_conflicts) {
1901 * If the peer request is fully contained within the
1902 * overlapping request, it can be discarded; otherwise,
1903 * it will be retried once all overlapping requests
1906 bool discard = i->sector <= sector && i->sector +
1907 (i->size >> 9) >= sector + (size >> 9);
1910 dev_alert(DEV, "Concurrent writes detected: "
1911 "local=%llus +%u, remote=%llus +%u, "
1912 "assuming %s came first\n",
1913 (unsigned long long)i->sector, i->size,
1914 (unsigned long long)sector, size,
1915 discard ? "local" : "remote");
1918 peer_req->w.cb = discard ? e_send_discard_write :
1920 list_add_tail(&peer_req->w.list, &mdev->done_ee);
1921 wake_asender(mdev->tconn);
1926 struct drbd_request *req =
1927 container_of(i, struct drbd_request, i);
1930 dev_alert(DEV, "Concurrent writes detected: "
1931 "local=%llus +%u, remote=%llus +%u\n",
1932 (unsigned long long)i->sector, i->size,
1933 (unsigned long long)sector, size);
1935 if (req->rq_state & RQ_LOCAL_PENDING ||
1936 !(req->rq_state & RQ_POSTPONED)) {
1938 * Wait for the node with the discard flag to
1939 * decide if this request will be discarded or
1940 * retried. Requests that are discarded will
1941 * disappear from the write_requests tree.
1943 * In addition, wait for the conflicting
1944 * request to finish locally before submitting
1945 * the conflicting peer request.
1947 err = drbd_wait_misc(mdev, &req->i);
1949 _conn_request_state(mdev->tconn,
1950 NS(conn, C_TIMEOUT),
1952 fail_postponed_requests(mdev, sector, size);
1958 * Remember to restart the conflicting requests after
1959 * the new peer request has completed.
1961 peer_req->flags |= EE_RESTART_REQUESTS;
1968 drbd_remove_epoch_entry_interval(mdev, peer_req);
1972 /* mirrored write */
1973 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
1975 struct drbd_conf *mdev;
1977 struct drbd_peer_request *peer_req;
1978 struct p_data *p = tconn->data.rbuf;
1979 u32 peer_seq = be32_to_cpu(p->seq_num);
1984 mdev = vnr_to_mdev(tconn, pi->vnr);
1988 if (!get_ldev(mdev)) {
1991 err = wait_for_and_update_peer_seq(mdev, peer_seq);
1992 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1993 atomic_inc(&mdev->current_epoch->epoch_size);
1994 err2 = drbd_drain_block(mdev, pi->size);
2001 * Corresponding put_ldev done either below (on various errors), or in
2002 * drbd_peer_request_endio, if we successfully submit the data at the
2003 * end of this function.
2006 sector = be64_to_cpu(p->sector);
2007 peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2013 peer_req->w.cb = e_end_block;
2015 dp_flags = be32_to_cpu(p->dp_flags);
2016 rw |= wire_flags_to_bio(mdev, dp_flags);
2018 if (dp_flags & DP_MAY_SET_IN_SYNC)
2019 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2021 spin_lock(&mdev->epoch_lock);
2022 peer_req->epoch = mdev->current_epoch;
2023 atomic_inc(&peer_req->epoch->epoch_size);
2024 atomic_inc(&peer_req->epoch->active);
2025 spin_unlock(&mdev->epoch_lock);
2027 if (mdev->tconn->net_conf->two_primaries) {
2028 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2030 goto out_interrupted;
2031 spin_lock_irq(&mdev->tconn->req_lock);
2032 err = handle_write_conflicts(mdev, peer_req);
2034 spin_unlock_irq(&mdev->tconn->req_lock);
2035 if (err == -ENOENT) {
2039 goto out_interrupted;
2042 spin_lock_irq(&mdev->tconn->req_lock);
2043 list_add(&peer_req->w.list, &mdev->active_ee);
2044 spin_unlock_irq(&mdev->tconn->req_lock);
2046 switch (mdev->tconn->net_conf->wire_protocol) {
2049 /* corresponding dec_unacked() in e_end_block()
2050 * respective _drbd_clear_done_ee */
2053 /* I really don't like it that the receiver thread
2054 * sends on the msock, but anyways */
2055 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2062 if (mdev->state.pdsk < D_INCONSISTENT) {
2063 /* In case we have the only disk of the cluster, */
2064 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2065 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2066 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2067 drbd_al_begin_io(mdev, peer_req->i.sector);
2070 err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2074 /* don't care for the reason here */
2075 dev_err(DEV, "submit failed, triggering re-connect\n");
2076 spin_lock_irq(&mdev->tconn->req_lock);
2077 list_del(&peer_req->w.list);
2078 drbd_remove_epoch_entry_interval(mdev, peer_req);
2079 spin_unlock_irq(&mdev->tconn->req_lock);
2080 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2081 drbd_al_complete_io(mdev, peer_req->i.sector);
2084 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2086 drbd_free_ee(mdev, peer_req);
2090 /* We may throttle resync, if the lower device seems to be busy,
2091 * and current sync rate is above c_min_rate.
2093 * To decide whether or not the lower device is busy, we use a scheme similar
2094 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2095 * (more than 64 sectors) of activity we cannot account for with our own resync
2096 * activity, it obviously is "busy".
2098 * The current sync rate used here uses only the most recent two step marks,
2099 * to have a short time average so we can react faster.
2101 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2103 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2104 unsigned long db, dt, dbdt;
2105 struct lc_element *tmp;
2109 /* feature disabled? */
2110 if (mdev->ldev->dc.c_min_rate == 0)
2113 spin_lock_irq(&mdev->al_lock);
2114 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2116 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2117 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2118 spin_unlock_irq(&mdev->al_lock);
2121 /* Do not slow down if app IO is already waiting for this extent */
2123 spin_unlock_irq(&mdev->al_lock);
2125 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2126 (int)part_stat_read(&disk->part0, sectors[1]) -
2127 atomic_read(&mdev->rs_sect_ev);
2129 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2130 unsigned long rs_left;
2133 mdev->rs_last_events = curr_events;
2135 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2137 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2139 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2140 rs_left = mdev->ov_left;
2142 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2144 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2147 db = mdev->rs_mark_left[i] - rs_left;
2148 dbdt = Bit2KB(db/dt);
2150 if (dbdt > mdev->ldev->dc.c_min_rate)
2157 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2159 struct drbd_conf *mdev;
2162 struct drbd_peer_request *peer_req;
2163 struct digest_info *di = NULL;
2165 unsigned int fault_type;
2166 struct p_block_req *p = tconn->data.rbuf;
2168 mdev = vnr_to_mdev(tconn, pi->vnr);
2171 capacity = drbd_get_capacity(mdev->this_bdev);
2173 sector = be64_to_cpu(p->sector);
2174 size = be32_to_cpu(p->blksize);
2176 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2177 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2178 (unsigned long long)sector, size);
2181 if (sector + (size>>9) > capacity) {
2182 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2183 (unsigned long long)sector, size);
2187 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2190 case P_DATA_REQUEST:
2191 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2193 case P_RS_DATA_REQUEST:
2194 case P_CSUM_RS_REQUEST:
2196 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2200 dec_rs_pending(mdev);
2201 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2206 if (verb && __ratelimit(&drbd_ratelimit_state))
2207 dev_err(DEV, "Can not satisfy peer's read request, "
2208 "no local data.\n");
2210 /* drain possibly payload */
2211 return drbd_drain_block(mdev, pi->size);
2214 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2215 * "criss-cross" setup, that might cause write-out on some other DRBD,
2216 * which in turn might block on the other node at this very place. */
2217 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2224 case P_DATA_REQUEST:
2225 peer_req->w.cb = w_e_end_data_req;
2226 fault_type = DRBD_FAULT_DT_RD;
2227 /* application IO, don't drbd_rs_begin_io */
2230 case P_RS_DATA_REQUEST:
2231 peer_req->w.cb = w_e_end_rsdata_req;
2232 fault_type = DRBD_FAULT_RS_RD;
2233 /* used in the sector offset progress display */
2234 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2238 case P_CSUM_RS_REQUEST:
2239 fault_type = DRBD_FAULT_RS_RD;
2240 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2244 di->digest_size = pi->size;
2245 di->digest = (((char *)di)+sizeof(struct digest_info));
2247 peer_req->digest = di;
2248 peer_req->flags |= EE_HAS_DIGEST;
2250 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2253 if (pi->cmd == P_CSUM_RS_REQUEST) {
2254 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2255 peer_req->w.cb = w_e_end_csum_rs_req;
2256 /* used in the sector offset progress display */
2257 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2258 } else if (pi->cmd == P_OV_REPLY) {
2259 /* track progress, we may need to throttle */
2260 atomic_add(size >> 9, &mdev->rs_sect_in);
2261 peer_req->w.cb = w_e_end_ov_reply;
2262 dec_rs_pending(mdev);
2263 /* drbd_rs_begin_io done when we sent this request,
2264 * but accounting still needs to be done. */
2265 goto submit_for_resync;
2270 if (mdev->ov_start_sector == ~(sector_t)0 &&
2271 mdev->tconn->agreed_pro_version >= 90) {
2272 unsigned long now = jiffies;
2274 mdev->ov_start_sector = sector;
2275 mdev->ov_position = sector;
2276 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2277 mdev->rs_total = mdev->ov_left;
2278 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2279 mdev->rs_mark_left[i] = mdev->ov_left;
2280 mdev->rs_mark_time[i] = now;
2282 dev_info(DEV, "Online Verify start sector: %llu\n",
2283 (unsigned long long)sector);
2285 peer_req->w.cb = w_e_end_ov_req;
2286 fault_type = DRBD_FAULT_RS_RD;
2293 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2294 * wrt the receiver, but it is not as straightforward as it may seem.
2295 * Various places in the resync start and stop logic assume resync
2296 * requests are processed in order, requeuing this on the worker thread
2297 * introduces a bunch of new code for synchronization between threads.
2299 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2300 * "forever", throttling after drbd_rs_begin_io will lock that extent
2301 * for application writes for the same time. For now, just throttle
2302 * here, where the rest of the code expects the receiver to sleep for
2306 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2307 * this defers syncer requests for some time, before letting at least
2308 * on request through. The resync controller on the receiving side
2309 * will adapt to the incoming rate accordingly.
2311 * We cannot throttle here if remote is Primary/SyncTarget:
2312 * we would also throttle its application reads.
2313 * In that case, throttling is done on the SyncTarget only.
2315 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2316 schedule_timeout_uninterruptible(HZ/10);
2317 if (drbd_rs_begin_io(mdev, sector))
2321 atomic_add(size >> 9, &mdev->rs_sect_ev);
2325 spin_lock_irq(&mdev->tconn->req_lock);
2326 list_add_tail(&peer_req->w.list, &mdev->read_ee);
2327 spin_unlock_irq(&mdev->tconn->req_lock);
2329 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2332 /* don't care for the reason here */
2333 dev_err(DEV, "submit failed, triggering re-connect\n");
2334 spin_lock_irq(&mdev->tconn->req_lock);
2335 list_del(&peer_req->w.list);
2336 spin_unlock_irq(&mdev->tconn->req_lock);
2337 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2341 drbd_free_ee(mdev, peer_req);
2345 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2347 int self, peer, rv = -100;
2348 unsigned long ch_self, ch_peer;
2350 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2351 peer = mdev->p_uuid[UI_BITMAP] & 1;
2353 ch_peer = mdev->p_uuid[UI_SIZE];
2354 ch_self = mdev->comm_bm_set;
2356 switch (mdev->tconn->net_conf->after_sb_0p) {
2358 case ASB_DISCARD_SECONDARY:
2359 case ASB_CALL_HELPER:
2360 dev_err(DEV, "Configuration error.\n");
2362 case ASB_DISCONNECT:
2364 case ASB_DISCARD_YOUNGER_PRI:
2365 if (self == 0 && peer == 1) {
2369 if (self == 1 && peer == 0) {
2373 /* Else fall through to one of the other strategies... */
2374 case ASB_DISCARD_OLDER_PRI:
2375 if (self == 0 && peer == 1) {
2379 if (self == 1 && peer == 0) {
2383 /* Else fall through to one of the other strategies... */
2384 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2385 "Using discard-least-changes instead\n");
2386 case ASB_DISCARD_ZERO_CHG:
2387 if (ch_peer == 0 && ch_self == 0) {
2388 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2392 if (ch_peer == 0) { rv = 1; break; }
2393 if (ch_self == 0) { rv = -1; break; }
2395 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2397 case ASB_DISCARD_LEAST_CHG:
2398 if (ch_self < ch_peer)
2400 else if (ch_self > ch_peer)
2402 else /* ( ch_self == ch_peer ) */
2403 /* Well, then use something else. */
2404 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2407 case ASB_DISCARD_LOCAL:
2410 case ASB_DISCARD_REMOTE:
2417 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2421 switch (mdev->tconn->net_conf->after_sb_1p) {
2422 case ASB_DISCARD_YOUNGER_PRI:
2423 case ASB_DISCARD_OLDER_PRI:
2424 case ASB_DISCARD_LEAST_CHG:
2425 case ASB_DISCARD_LOCAL:
2426 case ASB_DISCARD_REMOTE:
2427 dev_err(DEV, "Configuration error.\n");
2429 case ASB_DISCONNECT:
2432 hg = drbd_asb_recover_0p(mdev);
2433 if (hg == -1 && mdev->state.role == R_SECONDARY)
2435 if (hg == 1 && mdev->state.role == R_PRIMARY)
2439 rv = drbd_asb_recover_0p(mdev);
2441 case ASB_DISCARD_SECONDARY:
2442 return mdev->state.role == R_PRIMARY ? 1 : -1;
2443 case ASB_CALL_HELPER:
2444 hg = drbd_asb_recover_0p(mdev);
2445 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2446 enum drbd_state_rv rv2;
2448 drbd_set_role(mdev, R_SECONDARY, 0);
2449 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2450 * we might be here in C_WF_REPORT_PARAMS which is transient.
2451 * we do not need to wait for the after state change work either. */
2452 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2453 if (rv2 != SS_SUCCESS) {
2454 drbd_khelper(mdev, "pri-lost-after-sb");
2456 dev_warn(DEV, "Successfully gave up primary role.\n");
2466 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2470 switch (mdev->tconn->net_conf->after_sb_2p) {
2471 case ASB_DISCARD_YOUNGER_PRI:
2472 case ASB_DISCARD_OLDER_PRI:
2473 case ASB_DISCARD_LEAST_CHG:
2474 case ASB_DISCARD_LOCAL:
2475 case ASB_DISCARD_REMOTE:
2477 case ASB_DISCARD_SECONDARY:
2478 dev_err(DEV, "Configuration error.\n");
2481 rv = drbd_asb_recover_0p(mdev);
2483 case ASB_DISCONNECT:
2485 case ASB_CALL_HELPER:
2486 hg = drbd_asb_recover_0p(mdev);
2488 enum drbd_state_rv rv2;
2490 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2491 * we might be here in C_WF_REPORT_PARAMS which is transient.
2492 * we do not need to wait for the after state change work either. */
2493 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2494 if (rv2 != SS_SUCCESS) {
2495 drbd_khelper(mdev, "pri-lost-after-sb");
2497 dev_warn(DEV, "Successfully gave up primary role.\n");
2507 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2508 u64 bits, u64 flags)
2511 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2514 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2516 (unsigned long long)uuid[UI_CURRENT],
2517 (unsigned long long)uuid[UI_BITMAP],
2518 (unsigned long long)uuid[UI_HISTORY_START],
2519 (unsigned long long)uuid[UI_HISTORY_END],
2520 (unsigned long long)bits,
2521 (unsigned long long)flags);
2525 100 after split brain try auto recover
2526 2 C_SYNC_SOURCE set BitMap
2527 1 C_SYNC_SOURCE use BitMap
2529 -1 C_SYNC_TARGET use BitMap
2530 -2 C_SYNC_TARGET set BitMap
2531 -100 after split brain, disconnect
2532 -1000 unrelated data
2533 -1091 requires proto 91
2534 -1096 requires proto 96
2536 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2541 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2542 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2545 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2549 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2550 peer != UUID_JUST_CREATED)
2554 if (self != UUID_JUST_CREATED &&
2555 (peer == UUID_JUST_CREATED || peer == (u64)0))
2559 int rct, dc; /* roles at crash time */
2561 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2563 if (mdev->tconn->agreed_pro_version < 91)
2566 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2567 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2568 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2569 drbd_uuid_set_bm(mdev, 0UL);
2571 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2572 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2575 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2582 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2584 if (mdev->tconn->agreed_pro_version < 91)
2587 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2588 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2589 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2591 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2592 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2593 mdev->p_uuid[UI_BITMAP] = 0UL;
2595 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2598 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2605 /* Common power [off|failure] */
2606 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2607 (mdev->p_uuid[UI_FLAGS] & 2);
2608 /* lowest bit is set when we were primary,
2609 * next bit (weight 2) is set when peer was primary */
2613 case 0: /* !self_pri && !peer_pri */ return 0;
2614 case 1: /* self_pri && !peer_pri */ return 1;
2615 case 2: /* !self_pri && peer_pri */ return -1;
2616 case 3: /* self_pri && peer_pri */
2617 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2623 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2628 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2630 if (mdev->tconn->agreed_pro_version < 96 ?
2631 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2632 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2633 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2634 /* The last P_SYNC_UUID did not get though. Undo the last start of
2635 resync as sync source modifications of the peer's UUIDs. */
2637 if (mdev->tconn->agreed_pro_version < 91)
2640 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2641 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2643 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2644 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2651 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2652 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2653 peer = mdev->p_uuid[i] & ~((u64)1);
2659 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2660 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2665 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2667 if (mdev->tconn->agreed_pro_version < 96 ?
2668 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2669 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2670 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2671 /* The last P_SYNC_UUID did not get though. Undo the last start of
2672 resync as sync source modifications of our UUIDs. */
2674 if (mdev->tconn->agreed_pro_version < 91)
2677 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2678 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2680 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2681 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2682 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2690 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2691 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2692 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2698 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2699 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2700 if (self == peer && self != ((u64)0))
2704 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2705 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2706 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2707 peer = mdev->p_uuid[j] & ~((u64)1);
2716 /* drbd_sync_handshake() returns the new conn state on success, or
2717 CONN_MASK (-1) on failure.
2719 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2720 enum drbd_disk_state peer_disk) __must_hold(local)
2723 enum drbd_conns rv = C_MASK;
2724 enum drbd_disk_state mydisk;
2726 mydisk = mdev->state.disk;
2727 if (mydisk == D_NEGOTIATING)
2728 mydisk = mdev->new_state_tmp.disk;
2730 dev_info(DEV, "drbd_sync_handshake:\n");
2731 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2732 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2733 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2735 hg = drbd_uuid_compare(mdev, &rule_nr);
2737 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2740 dev_alert(DEV, "Unrelated data, aborting!\n");
2744 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2748 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2749 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2750 int f = (hg == -100) || abs(hg) == 2;
2751 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2754 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2755 hg > 0 ? "source" : "target");
2759 drbd_khelper(mdev, "initial-split-brain");
2761 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
2762 int pcount = (mdev->state.role == R_PRIMARY)
2763 + (peer_role == R_PRIMARY);
2764 int forced = (hg == -100);
2768 hg = drbd_asb_recover_0p(mdev);
2771 hg = drbd_asb_recover_1p(mdev);
2774 hg = drbd_asb_recover_2p(mdev);
2777 if (abs(hg) < 100) {
2778 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2779 "automatically solved. Sync from %s node\n",
2780 pcount, (hg < 0) ? "peer" : "this");
2782 dev_warn(DEV, "Doing a full sync, since"
2783 " UUIDs where ambiguous.\n");
2790 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2792 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2796 dev_warn(DEV, "Split-Brain detected, manually solved. "
2797 "Sync from %s node\n",
2798 (hg < 0) ? "peer" : "this");
2802 /* FIXME this log message is not correct if we end up here
2803 * after an attempted attach on a diskless node.
2804 * We just refuse to attach -- well, we drop the "connection"
2805 * to that disk, in a way... */
2806 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2807 drbd_khelper(mdev, "split-brain");
2811 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2812 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2816 if (hg < 0 && /* by intention we do not use mydisk here. */
2817 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2818 switch (mdev->tconn->net_conf->rr_conflict) {
2819 case ASB_CALL_HELPER:
2820 drbd_khelper(mdev, "pri-lost");
2822 case ASB_DISCONNECT:
2823 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2826 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2831 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2833 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2835 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2836 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2837 abs(hg) >= 2 ? "full" : "bit-map based");
2842 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2843 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2844 BM_LOCKED_SET_ALLOWED))
2848 if (hg > 0) { /* become sync source. */
2850 } else if (hg < 0) { /* become sync target */
2854 if (drbd_bm_total_weight(mdev)) {
2855 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2856 drbd_bm_total_weight(mdev));
2863 /* returns 1 if invalid */
2864 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2866 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2867 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2868 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2871 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2872 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2873 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2876 /* everything else is valid if they are equal on both sides. */
2880 /* everything es is invalid. */
2884 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2886 struct p_protocol *p = tconn->data.rbuf;
2887 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2888 int p_want_lose, p_two_primaries, cf;
2889 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2891 p_proto = be32_to_cpu(p->protocol);
2892 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2893 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2894 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
2895 p_two_primaries = be32_to_cpu(p->two_primaries);
2896 cf = be32_to_cpu(p->conn_flags);
2897 p_want_lose = cf & CF_WANT_LOSE;
2899 clear_bit(CONN_DRY_RUN, &tconn->flags);
2901 if (cf & CF_DRY_RUN)
2902 set_bit(CONN_DRY_RUN, &tconn->flags);
2904 if (p_proto != tconn->net_conf->wire_protocol) {
2905 conn_err(tconn, "incompatible communication protocols\n");
2909 if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2910 conn_err(tconn, "incompatible after-sb-0pri settings\n");
2914 if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2915 conn_err(tconn, "incompatible after-sb-1pri settings\n");
2919 if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2920 conn_err(tconn, "incompatible after-sb-2pri settings\n");
2924 if (p_want_lose && tconn->net_conf->want_lose) {
2925 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
2929 if (p_two_primaries != tconn->net_conf->two_primaries) {
2930 conn_err(tconn, "incompatible setting of the two-primaries options\n");
2934 if (tconn->agreed_pro_version >= 87) {
2935 unsigned char *my_alg = tconn->net_conf->integrity_alg;
2938 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
2942 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2943 if (strcmp(p_integrity_alg, my_alg)) {
2944 conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
2947 conn_info(tconn, "data-integrity-alg: %s\n",
2948 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2954 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
2959 * input: alg name, feature name
2960 * return: NULL (alg name was "")
2961 * ERR_PTR(error) if something goes wrong
2962 * or the crypto hash ptr, if it worked out ok. */
2963 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2964 const char *alg, const char *name)
2966 struct crypto_hash *tfm;
2971 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2973 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2974 alg, name, PTR_ERR(tfm));
2977 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2978 crypto_free_hash(tfm);
2979 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2980 return ERR_PTR(-EINVAL);
2985 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
2987 void *buffer = tconn->data.rbuf;
2988 int size = pi->size;
2991 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
2992 s = drbd_recv(tconn, buffer, s);
3006 * config_unknown_volume - device configuration command for unknown volume
3008 * When a device is added to an existing connection, the node on which the
3009 * device is added first will send configuration commands to its peer but the
3010 * peer will not know about the device yet. It will warn and ignore these
3011 * commands. Once the device is added on the second node, the second node will
3012 * send the same device configuration commands, but in the other direction.
3014 * (We can also end up here if drbd is misconfigured.)
3016 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3018 conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3019 pi->vnr, cmdname(pi->cmd));
3020 return ignore_remaining_packet(tconn, pi);
3023 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3025 struct drbd_conf *mdev;
3026 struct p_rs_param_95 *p = tconn->data.rbuf;
3027 unsigned int header_size, data_size, exp_max_sz;
3028 struct crypto_hash *verify_tfm = NULL;
3029 struct crypto_hash *csums_tfm = NULL;
3030 const int apv = tconn->agreed_pro_version;
3031 int *rs_plan_s = NULL;
3035 mdev = vnr_to_mdev(tconn, pi->vnr);
3037 return config_unknown_volume(tconn, pi);
3039 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3040 : apv == 88 ? sizeof(struct p_rs_param)
3042 : apv <= 94 ? sizeof(struct p_rs_param_89)
3043 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3045 if (pi->size > exp_max_sz) {
3046 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3047 pi->size, exp_max_sz);
3052 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
3053 data_size = pi->size - header_size;
3054 } else if (apv <= 94) {
3055 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
3056 data_size = pi->size - header_size;
3057 D_ASSERT(data_size == 0);
3059 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
3060 data_size = pi->size - header_size;
3061 D_ASSERT(data_size == 0);
3064 /* initialize verify_alg and csums_alg */
3065 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3067 err = drbd_recv_all(mdev->tconn, &p->head.payload, header_size);
3071 if (get_ldev(mdev)) {
3072 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3078 if (data_size > SHARED_SECRET_MAX) {
3079 dev_err(DEV, "verify-alg too long, "
3080 "peer wants %u, accepting only %u byte\n",
3081 data_size, SHARED_SECRET_MAX);
3085 err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3089 /* we expect NUL terminated string */
3090 /* but just in case someone tries to be evil */
3091 D_ASSERT(p->verify_alg[data_size-1] == 0);
3092 p->verify_alg[data_size-1] = 0;
3094 } else /* apv >= 89 */ {
3095 /* we still expect NUL terminated strings */
3096 /* but just in case someone tries to be evil */
3097 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3098 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3099 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3100 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3103 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
3104 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3105 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3106 mdev->tconn->net_conf->verify_alg, p->verify_alg);
3109 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3110 p->verify_alg, "verify-alg");
3111 if (IS_ERR(verify_tfm)) {
3117 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
3118 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3119 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3120 mdev->tconn->net_conf->csums_alg, p->csums_alg);
3123 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3124 p->csums_alg, "csums-alg");
3125 if (IS_ERR(csums_tfm)) {
3131 if (apv > 94 && get_ldev(mdev)) {
3132 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3133 mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3134 mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3135 mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3136 mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3138 fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3139 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3140 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3142 dev_err(DEV, "kmalloc of fifo_buffer failed");
3150 spin_lock(&mdev->peer_seq_lock);
3151 /* lock against drbd_nl_syncer_conf() */
3153 strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3154 mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3155 crypto_free_hash(mdev->tconn->verify_tfm);
3156 mdev->tconn->verify_tfm = verify_tfm;
3157 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3160 strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3161 mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3162 crypto_free_hash(mdev->tconn->csums_tfm);
3163 mdev->tconn->csums_tfm = csums_tfm;
3164 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3166 if (fifo_size != mdev->rs_plan_s.size) {
3167 kfree(mdev->rs_plan_s.values);
3168 mdev->rs_plan_s.values = rs_plan_s;
3169 mdev->rs_plan_s.size = fifo_size;
3170 mdev->rs_planed = 0;
3172 spin_unlock(&mdev->peer_seq_lock);
3177 /* just for completeness: actually not needed,
3178 * as this is not reached if csums_tfm was ok. */
3179 crypto_free_hash(csums_tfm);
3180 /* but free the verify_tfm again, if csums_tfm did not work out */
3181 crypto_free_hash(verify_tfm);
3182 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3186 /* warn if the arguments differ by more than 12.5% */
3187 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3188 const char *s, sector_t a, sector_t b)
3191 if (a == 0 || b == 0)
3193 d = (a > b) ? (a - b) : (b - a);
3194 if (d > (a>>3) || d > (b>>3))
3195 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3196 (unsigned long long)a, (unsigned long long)b);
3199 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3201 struct drbd_conf *mdev;
3202 struct p_sizes *p = tconn->data.rbuf;
3203 enum determine_dev_size dd = unchanged;
3204 sector_t p_size, p_usize, my_usize;
3205 int ldsc = 0; /* local disk size changed */
3206 enum dds_flags ddsf;
3208 mdev = vnr_to_mdev(tconn, pi->vnr);
3210 return config_unknown_volume(tconn, pi);
3212 p_size = be64_to_cpu(p->d_size);
3213 p_usize = be64_to_cpu(p->u_size);
3215 /* just store the peer's disk size for now.
3216 * we still need to figure out whether we accept that. */
3217 mdev->p_size = p_size;
3219 if (get_ldev(mdev)) {
3220 warn_if_differ_considerably(mdev, "lower level device sizes",
3221 p_size, drbd_get_max_capacity(mdev->ldev));
3222 warn_if_differ_considerably(mdev, "user requested size",
3223 p_usize, mdev->ldev->dc.disk_size);
3225 /* if this is the first connect, or an otherwise expected
3226 * param exchange, choose the minimum */
3227 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3228 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3231 my_usize = mdev->ldev->dc.disk_size;
3233 if (mdev->ldev->dc.disk_size != p_usize) {
3234 mdev->ldev->dc.disk_size = p_usize;
3235 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3236 (unsigned long)mdev->ldev->dc.disk_size);
3239 /* Never shrink a device with usable data during connect.
3240 But allow online shrinking if we are connected. */
3241 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3242 drbd_get_capacity(mdev->this_bdev) &&
3243 mdev->state.disk >= D_OUTDATED &&
3244 mdev->state.conn < C_CONNECTED) {
3245 dev_err(DEV, "The peer's disk size is too small!\n");
3246 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3247 mdev->ldev->dc.disk_size = my_usize;
3254 ddsf = be16_to_cpu(p->dds_flags);
3255 if (get_ldev(mdev)) {
3256 dd = drbd_determine_dev_size(mdev, ddsf);
3258 if (dd == dev_size_error)
3262 /* I am diskless, need to accept the peer's size. */
3263 drbd_set_my_capacity(mdev, p_size);
3266 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3267 drbd_reconsider_max_bio_size(mdev);
3269 if (get_ldev(mdev)) {
3270 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3271 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3278 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3279 if (be64_to_cpu(p->c_size) !=
3280 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3281 /* we have different sizes, probably peer
3282 * needs to know my new size... */
3283 drbd_send_sizes(mdev, 0, ddsf);
3285 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3286 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3287 if (mdev->state.pdsk >= D_INCONSISTENT &&
3288 mdev->state.disk >= D_INCONSISTENT) {
3289 if (ddsf & DDSF_NO_RESYNC)
3290 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3292 resync_after_online_grow(mdev);
3294 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3301 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3303 struct drbd_conf *mdev;
3304 struct p_uuids *p = tconn->data.rbuf;
3306 int i, updated_uuids = 0;
3308 mdev = vnr_to_mdev(tconn, pi->vnr);
3310 return config_unknown_volume(tconn, pi);
3312 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3314 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3315 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3317 kfree(mdev->p_uuid);
3318 mdev->p_uuid = p_uuid;
3320 if (mdev->state.conn < C_CONNECTED &&
3321 mdev->state.disk < D_INCONSISTENT &&
3322 mdev->state.role == R_PRIMARY &&
3323 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3324 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3325 (unsigned long long)mdev->ed_uuid);
3326 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3330 if (get_ldev(mdev)) {
3331 int skip_initial_sync =
3332 mdev->state.conn == C_CONNECTED &&
3333 mdev->tconn->agreed_pro_version >= 90 &&
3334 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3335 (p_uuid[UI_FLAGS] & 8);
3336 if (skip_initial_sync) {
3337 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3338 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3339 "clear_n_write from receive_uuids",
3340 BM_LOCKED_TEST_ALLOWED);
3341 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3342 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3343 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3349 } else if (mdev->state.disk < D_INCONSISTENT &&
3350 mdev->state.role == R_PRIMARY) {
3351 /* I am a diskless primary, the peer just created a new current UUID
3353 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3356 /* Before we test for the disk state, we should wait until an eventually
3357 ongoing cluster wide state change is finished. That is important if
3358 we are primary and are detaching from our disk. We need to see the
3359 new disk state... */
3360 mutex_lock(mdev->state_mutex);
3361 mutex_unlock(mdev->state_mutex);
3362 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3363 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3366 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3372 * convert_state() - Converts the peer's view of the cluster state to our point of view
3373 * @ps: The state as seen by the peer.
3375 static union drbd_state convert_state(union drbd_state ps)
3377 union drbd_state ms;
3379 static enum drbd_conns c_tab[] = {
3380 [C_CONNECTED] = C_CONNECTED,
3382 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3383 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3384 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3385 [C_VERIFY_S] = C_VERIFY_T,
3391 ms.conn = c_tab[ps.conn];
3396 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3401 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3403 struct drbd_conf *mdev;
3404 struct p_req_state *p = tconn->data.rbuf;
3405 union drbd_state mask, val;
3406 enum drbd_state_rv rv;
3408 mdev = vnr_to_mdev(tconn, pi->vnr);
3412 mask.i = be32_to_cpu(p->mask);
3413 val.i = be32_to_cpu(p->val);
3415 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3416 mutex_is_locked(mdev->state_mutex)) {
3417 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3421 mask = convert_state(mask);
3422 val = convert_state(val);
3424 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3425 drbd_send_sr_reply(mdev, rv);
3432 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3434 struct p_req_state *p = tconn->data.rbuf;
3435 union drbd_state mask, val;
3436 enum drbd_state_rv rv;
3438 mask.i = be32_to_cpu(p->mask);
3439 val.i = be32_to_cpu(p->val);
3441 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3442 mutex_is_locked(&tconn->cstate_mutex)) {
3443 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3447 mask = convert_state(mask);
3448 val = convert_state(val);
3450 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY);
3451 conn_send_sr_reply(tconn, rv);
3456 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3458 struct drbd_conf *mdev;
3459 struct p_state *p = tconn->data.rbuf;
3460 union drbd_state os, ns, peer_state;
3461 enum drbd_disk_state real_peer_disk;
3462 enum chg_state_flags cs_flags;
3465 mdev = vnr_to_mdev(tconn, pi->vnr);
3467 return config_unknown_volume(tconn, pi);
3469 peer_state.i = be32_to_cpu(p->state);
3471 real_peer_disk = peer_state.disk;
3472 if (peer_state.disk == D_NEGOTIATING) {
3473 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3474 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3477 spin_lock_irq(&mdev->tconn->req_lock);
3479 os = ns = mdev->state;
3480 spin_unlock_irq(&mdev->tconn->req_lock);
3482 /* peer says his disk is uptodate, while we think it is inconsistent,
3483 * and this happens while we think we have a sync going on. */
3484 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3485 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3486 /* If we are (becoming) SyncSource, but peer is still in sync
3487 * preparation, ignore its uptodate-ness to avoid flapping, it
3488 * will change to inconsistent once the peer reaches active
3490 * It may have changed syncer-paused flags, however, so we
3491 * cannot ignore this completely. */
3492 if (peer_state.conn > C_CONNECTED &&
3493 peer_state.conn < C_SYNC_SOURCE)
3494 real_peer_disk = D_INCONSISTENT;
3496 /* if peer_state changes to connected at the same time,
3497 * it explicitly notifies us that it finished resync.
3498 * Maybe we should finish it up, too? */
3499 else if (os.conn >= C_SYNC_SOURCE &&
3500 peer_state.conn == C_CONNECTED) {
3501 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3502 drbd_resync_finished(mdev);
3507 /* peer says his disk is inconsistent, while we think it is uptodate,
3508 * and this happens while the peer still thinks we have a sync going on,
3509 * but we think we are already done with the sync.
3510 * We ignore this to avoid flapping pdsk.
3511 * This should not happen, if the peer is a recent version of drbd. */
3512 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3513 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3514 real_peer_disk = D_UP_TO_DATE;
3516 if (ns.conn == C_WF_REPORT_PARAMS)
3517 ns.conn = C_CONNECTED;
3519 if (peer_state.conn == C_AHEAD)
3522 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3523 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3524 int cr; /* consider resync */
3526 /* if we established a new connection */
3527 cr = (os.conn < C_CONNECTED);
3528 /* if we had an established connection
3529 * and one of the nodes newly attaches a disk */
3530 cr |= (os.conn == C_CONNECTED &&
3531 (peer_state.disk == D_NEGOTIATING ||
3532 os.disk == D_NEGOTIATING));
3533 /* if we have both been inconsistent, and the peer has been
3534 * forced to be UpToDate with --overwrite-data */
3535 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3536 /* if we had been plain connected, and the admin requested to
3537 * start a sync by "invalidate" or "invalidate-remote" */
3538 cr |= (os.conn == C_CONNECTED &&
3539 (peer_state.conn >= C_STARTING_SYNC_S &&
3540 peer_state.conn <= C_WF_BITMAP_T));
3543 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3546 if (ns.conn == C_MASK) {
3547 ns.conn = C_CONNECTED;
3548 if (mdev->state.disk == D_NEGOTIATING) {
3549 drbd_force_state(mdev, NS(disk, D_FAILED));
3550 } else if (peer_state.disk == D_NEGOTIATING) {
3551 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3552 peer_state.disk = D_DISKLESS;
3553 real_peer_disk = D_DISKLESS;
3555 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3557 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3558 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3564 spin_lock_irq(&mdev->tconn->req_lock);
3565 if (mdev->state.i != os.i)
3567 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3568 ns.peer = peer_state.role;
3569 ns.pdsk = real_peer_disk;
3570 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3571 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3572 ns.disk = mdev->new_state_tmp.disk;
3573 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3574 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3575 test_bit(NEW_CUR_UUID, &mdev->flags)) {
3576 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3577 for temporal network outages! */
3578 spin_unlock_irq(&mdev->tconn->req_lock);
3579 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3580 tl_clear(mdev->tconn);
3581 drbd_uuid_new_current(mdev);
3582 clear_bit(NEW_CUR_UUID, &mdev->flags);
3583 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3586 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3588 spin_unlock_irq(&mdev->tconn->req_lock);
3590 if (rv < SS_SUCCESS) {
3591 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3595 if (os.conn > C_WF_REPORT_PARAMS) {
3596 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3597 peer_state.disk != D_NEGOTIATING ) {
3598 /* we want resync, peer has not yet decided to sync... */
3599 /* Nowadays only used when forcing a node into primary role and
3600 setting its disk to UpToDate with that */
3601 drbd_send_uuids(mdev);
3602 drbd_send_state(mdev);
3606 mdev->tconn->net_conf->want_lose = 0;
3608 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3613 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3615 struct drbd_conf *mdev;
3616 struct p_rs_uuid *p = tconn->data.rbuf;
3618 mdev = vnr_to_mdev(tconn, pi->vnr);
3622 wait_event(mdev->misc_wait,
3623 mdev->state.conn == C_WF_SYNC_UUID ||
3624 mdev->state.conn == C_BEHIND ||
3625 mdev->state.conn < C_CONNECTED ||
3626 mdev->state.disk < D_NEGOTIATING);
3628 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3630 /* Here the _drbd_uuid_ functions are right, current should
3631 _not_ be rotated into the history */
3632 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3633 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3634 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3636 drbd_print_uuids(mdev, "updated sync uuid");
3637 drbd_start_resync(mdev, C_SYNC_TARGET);
3641 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3647 * receive_bitmap_plain
3649 * Return 0 when done, 1 when another iteration is needed, and a negative error
3650 * code upon failure.
3653 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3654 struct p_header *h, struct bm_xfer_ctx *c)
3656 unsigned long *buffer = (unsigned long *)h->payload;
3657 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3658 unsigned want = num_words * sizeof(long);
3661 if (want != data_size) {
3662 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3667 err = drbd_recv_all(mdev->tconn, buffer, want);
3671 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3673 c->word_offset += num_words;
3674 c->bit_offset = c->word_offset * BITS_PER_LONG;
3675 if (c->bit_offset > c->bm_bits)
3676 c->bit_offset = c->bm_bits;
3681 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3683 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3686 static int dcbp_get_start(struct p_compressed_bm *p)
3688 return (p->encoding & 0x80) != 0;
3691 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3693 return (p->encoding >> 4) & 0x7;
3699 * Return 0 when done, 1 when another iteration is needed, and a negative error
3700 * code upon failure.
3703 recv_bm_rle_bits(struct drbd_conf *mdev,
3704 struct p_compressed_bm *p,
3705 struct bm_xfer_ctx *c,
3708 struct bitstream bs;
3712 unsigned long s = c->bit_offset;
3714 int toggle = dcbp_get_start(p);
3718 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3720 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3724 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3725 bits = vli_decode_bits(&rl, look_ahead);
3731 if (e >= c->bm_bits) {
3732 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3735 _drbd_bm_set_bits(mdev, s, e);
3739 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3740 have, bits, look_ahead,
3741 (unsigned int)(bs.cur.b - p->code),
3742 (unsigned int)bs.buf_len);
3745 look_ahead >>= bits;
3748 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3751 look_ahead |= tmp << have;
3756 bm_xfer_ctx_bit_to_word_offset(c);
3758 return (s != c->bm_bits);
3764 * Return 0 when done, 1 when another iteration is needed, and a negative error
3765 * code upon failure.
3768 decode_bitmap_c(struct drbd_conf *mdev,
3769 struct p_compressed_bm *p,
3770 struct bm_xfer_ctx *c,
3773 if (dcbp_get_code(p) == RLE_VLI_Bits)
3774 return recv_bm_rle_bits(mdev, p, c, len);
3776 /* other variants had been implemented for evaluation,
3777 * but have been dropped as this one turned out to be "best"
3778 * during all our tests. */
3780 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3781 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3785 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3786 const char *direction, struct bm_xfer_ctx *c)
3788 /* what would it take to transfer it "plaintext" */
3789 unsigned plain = sizeof(struct p_header) *
3790 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3791 + c->bm_words * sizeof(long);
3792 unsigned total = c->bytes[0] + c->bytes[1];
3795 /* total can not be zero. but just in case: */
3799 /* don't report if not compressed */
3803 /* total < plain. check for overflow, still */
3804 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3805 : (1000 * total / plain);
3811 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3812 "total %u; compression: %u.%u%%\n",
3814 c->bytes[1], c->packets[1],
3815 c->bytes[0], c->packets[0],
3816 total, r/10, r % 10);
3819 /* Since we are processing the bitfield from lower addresses to higher,
3820 it does not matter if the process it in 32 bit chunks or 64 bit
3821 chunks as long as it is little endian. (Understand it as byte stream,
3822 beginning with the lowest byte...) If we would use big endian
3823 we would need to process it from the highest address to the lowest,
3824 in order to be agnostic to the 32 vs 64 bits issue.
3826 returns 0 on failure, 1 if we successfully received it. */
3827 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3829 struct drbd_conf *mdev;
3830 struct bm_xfer_ctx c;
3832 struct p_header *h = tconn->data.rbuf;
3834 mdev = vnr_to_mdev(tconn, pi->vnr);
3838 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3839 /* you are supposed to send additional out-of-sync information
3840 * if you actually set bits during this phase */
3842 c = (struct bm_xfer_ctx) {
3843 .bm_bits = drbd_bm_bits(mdev),
3844 .bm_words = drbd_bm_words(mdev),
3848 if (pi->cmd == P_BITMAP) {
3849 err = receive_bitmap_plain(mdev, pi->size, h, &c);
3850 } else if (pi->cmd == P_COMPRESSED_BITMAP) {
3851 /* MAYBE: sanity check that we speak proto >= 90,
3852 * and the feature is enabled! */
3853 struct p_compressed_bm *p;
3855 if (pi->size > BM_PACKET_PAYLOAD_BYTES) {
3856 dev_err(DEV, "ReportCBitmap packet too large\n");
3861 p = mdev->tconn->data.rbuf;
3862 err = drbd_recv_all(mdev->tconn, p->head.payload, pi->size);
3865 if (pi->size <= (sizeof(*p) - sizeof(p->head))) {
3866 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
3870 err = decode_bitmap_c(mdev, p, &c, pi->size);
3872 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
3877 c.packets[pi->cmd == P_BITMAP]++;
3878 c.bytes[pi->cmd == P_BITMAP] += sizeof(struct p_header) + pi->size;
3885 err = drbd_recv_header(mdev->tconn, pi);
3890 INFO_bm_xfer_stats(mdev, "receive", &c);
3892 if (mdev->state.conn == C_WF_BITMAP_T) {
3893 enum drbd_state_rv rv;
3895 err = drbd_send_bitmap(mdev);
3898 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3899 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3900 D_ASSERT(rv == SS_SUCCESS);
3901 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3902 /* admin may have requested C_DISCONNECTING,
3903 * other threads may have noticed network errors */
3904 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3905 drbd_conn_str(mdev->state.conn));
3910 drbd_bm_unlock(mdev);
3911 if (!err && mdev->state.conn == C_WF_BITMAP_S)
3912 drbd_start_resync(mdev, C_SYNC_SOURCE);
3916 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
3918 conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
3921 return ignore_remaining_packet(tconn, pi);
3924 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
3926 /* Make sure we've acked all the TCP data associated
3927 * with the data requests being unplugged */
3928 drbd_tcp_quickack(tconn->data.socket);
3933 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
3935 struct drbd_conf *mdev;
3936 struct p_block_desc *p = tconn->data.rbuf;
3938 mdev = vnr_to_mdev(tconn, pi->vnr);
3942 switch (mdev->state.conn) {
3943 case C_WF_SYNC_UUID:
3948 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3949 drbd_conn_str(mdev->state.conn));
3952 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3960 int (*fn)(struct drbd_tconn *, struct packet_info *);
3963 static struct data_cmd drbd_cmd_handler[] = {
3964 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3965 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3966 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3967 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3968 [P_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3969 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3970 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), receive_UnplugRemote },
3971 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3972 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3973 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), receive_SyncParam },
3974 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), receive_SyncParam },
3975 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
3976 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
3977 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
3978 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
3979 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
3980 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3981 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3982 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3983 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3984 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
3985 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3986 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
3989 static void drbdd(struct drbd_tconn *tconn)
3991 struct p_header *header = tconn->data.rbuf;
3992 struct packet_info pi;
3993 size_t shs; /* sub header size */
3996 while (get_t_state(&tconn->receiver) == RUNNING) {
3997 struct data_cmd *cmd;
3999 drbd_thread_current_set_cpu(&tconn->receiver);
4000 if (drbd_recv_header(tconn, &pi))
4003 cmd = &drbd_cmd_handler[pi.cmd];
4004 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4005 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4009 shs = cmd->pkt_size - sizeof(struct p_header);
4010 if (pi.size - shs > 0 && !cmd->expect_payload) {
4011 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4016 err = drbd_recv_all_warn(tconn, &header->payload, shs);
4022 err = cmd->fn(tconn, &pi);
4024 conn_err(tconn, "error receiving %s, l: %d!\n",
4025 cmdname(pi.cmd), pi.size);
4032 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4035 void conn_flush_workqueue(struct drbd_tconn *tconn)
4037 struct drbd_wq_barrier barr;
4039 barr.w.cb = w_prev_work_done;
4040 barr.w.tconn = tconn;
4041 init_completion(&barr.done);
4042 drbd_queue_work(&tconn->data.work, &barr.w);
4043 wait_for_completion(&barr.done);
4046 static void drbd_disconnect(struct drbd_tconn *tconn)
4049 int rv = SS_UNKNOWN_ERROR;
4051 if (tconn->cstate == C_STANDALONE)
4054 /* asender does not clean up anything. it must not interfere, either */
4055 drbd_thread_stop(&tconn->asender);
4056 drbd_free_sock(tconn);
4058 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4059 conn_info(tconn, "Connection closed\n");
4061 if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4062 conn_try_outdate_peer_async(tconn);
4064 spin_lock_irq(&tconn->req_lock);
4066 if (oc >= C_UNCONNECTED)
4067 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4069 spin_unlock_irq(&tconn->req_lock);
4071 if (oc == C_DISCONNECTING) {
4072 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4074 crypto_free_hash(tconn->cram_hmac_tfm);
4075 tconn->cram_hmac_tfm = NULL;
4077 kfree(tconn->net_conf);
4078 tconn->net_conf = NULL;
4079 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
4083 static int drbd_disconnected(int vnr, void *p, void *data)
4085 struct drbd_conf *mdev = (struct drbd_conf *)p;
4086 enum drbd_fencing_p fp;
4089 /* wait for current activity to cease. */
4090 spin_lock_irq(&mdev->tconn->req_lock);
4091 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4092 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4093 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4094 spin_unlock_irq(&mdev->tconn->req_lock);
4096 /* We do not have data structures that would allow us to
4097 * get the rs_pending_cnt down to 0 again.
4098 * * On C_SYNC_TARGET we do not have any data structures describing
4099 * the pending RSDataRequest's we have sent.
4100 * * On C_SYNC_SOURCE there is no data structure that tracks
4101 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4102 * And no, it is not the sum of the reference counts in the
4103 * resync_LRU. The resync_LRU tracks the whole operation including
4104 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4106 drbd_rs_cancel_all(mdev);
4108 mdev->rs_failed = 0;
4109 atomic_set(&mdev->rs_pending_cnt, 0);
4110 wake_up(&mdev->misc_wait);
4112 del_timer(&mdev->request_timer);
4114 del_timer_sync(&mdev->resync_timer);
4115 resync_timer_fn((unsigned long)mdev);
4117 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4118 * w_make_resync_request etc. which may still be on the worker queue
4119 * to be "canceled" */
4120 drbd_flush_workqueue(mdev);
4122 /* This also does reclaim_net_ee(). If we do this too early, we might
4123 * miss some resync ee and pages.*/
4124 drbd_process_done_ee(mdev);
4126 kfree(mdev->p_uuid);
4127 mdev->p_uuid = NULL;
4129 if (!is_susp(mdev->state))
4130 tl_clear(mdev->tconn);
4135 if (get_ldev(mdev)) {
4136 fp = mdev->ldev->dc.fencing;
4140 /* serialize with bitmap writeout triggered by the state change,
4142 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4144 /* tcp_close and release of sendpage pages can be deferred. I don't
4145 * want to use SO_LINGER, because apparently it can be deferred for
4146 * more than 20 seconds (longest time I checked).
4148 * Actually we don't care for exactly when the network stack does its
4149 * put_page(), but release our reference on these pages right here.
4151 i = drbd_release_ee(mdev, &mdev->net_ee);
4153 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4154 i = atomic_read(&mdev->pp_in_use_by_net);
4156 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4157 i = atomic_read(&mdev->pp_in_use);
4159 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4161 D_ASSERT(list_empty(&mdev->read_ee));
4162 D_ASSERT(list_empty(&mdev->active_ee));
4163 D_ASSERT(list_empty(&mdev->sync_ee));
4164 D_ASSERT(list_empty(&mdev->done_ee));
4166 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4167 atomic_set(&mdev->current_epoch->epoch_size, 0);
4168 D_ASSERT(list_empty(&mdev->current_epoch->list));
4174 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4175 * we can agree on is stored in agreed_pro_version.
4177 * feature flags and the reserved array should be enough room for future
4178 * enhancements of the handshake protocol, and possible plugins...
4180 * for now, they are expected to be zero, but ignored.
4182 static int drbd_send_handshake(struct drbd_tconn *tconn)
4184 /* ASSERT current == mdev->tconn->receiver ... */
4185 struct p_handshake *p = tconn->data.sbuf;
4188 if (mutex_lock_interruptible(&tconn->data.mutex)) {
4189 conn_err(tconn, "interrupted during initial handshake\n");
4193 if (tconn->data.socket == NULL) {
4194 mutex_unlock(&tconn->data.mutex);
4198 memset(p, 0, sizeof(*p));
4199 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4200 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4201 err = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
4202 &p->head, sizeof(*p), 0);
4203 mutex_unlock(&tconn->data.mutex);
4209 * 1 yes, we have a valid connection
4210 * 0 oops, did not work out, please try again
4211 * -1 peer talks different language,
4212 * no point in trying again, please go standalone.
4214 static int drbd_do_handshake(struct drbd_tconn *tconn)
4216 /* ASSERT current == tconn->receiver ... */
4217 struct p_handshake *p = tconn->data.rbuf;
4218 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
4219 struct packet_info pi;
4222 err = drbd_send_handshake(tconn);
4226 err = drbd_recv_header(tconn, &pi);
4230 if (pi.cmd != P_HAND_SHAKE) {
4231 conn_err(tconn, "expected HandShake packet, received: %s (0x%04x)\n",
4232 cmdname(pi.cmd), pi.cmd);
4236 if (pi.size != expect) {
4237 conn_err(tconn, "expected HandShake length: %u, received: %u\n",
4242 err = drbd_recv_all_warn(tconn, &p->head.payload, expect);
4246 p->protocol_min = be32_to_cpu(p->protocol_min);
4247 p->protocol_max = be32_to_cpu(p->protocol_max);
4248 if (p->protocol_max == 0)
4249 p->protocol_max = p->protocol_min;
4251 if (PRO_VERSION_MAX < p->protocol_min ||
4252 PRO_VERSION_MIN > p->protocol_max)
4255 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4257 conn_info(tconn, "Handshake successful: "
4258 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4263 conn_err(tconn, "incompatible DRBD dialects: "
4264 "I support %d-%d, peer supports %d-%d\n",
4265 PRO_VERSION_MIN, PRO_VERSION_MAX,
4266 p->protocol_min, p->protocol_max);
4270 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4271 static int drbd_do_auth(struct drbd_tconn *tconn)
4273 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4274 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4278 #define CHALLENGE_LEN 64
4282 0 - failed, try again (network error),
4283 -1 - auth failed, don't try again.
4286 static int drbd_do_auth(struct drbd_tconn *tconn)
4288 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4289 struct scatterlist sg;
4290 char *response = NULL;
4291 char *right_response = NULL;
4292 char *peers_ch = NULL;
4293 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
4294 unsigned int resp_size;
4295 struct hash_desc desc;
4296 struct packet_info pi;
4299 desc.tfm = tconn->cram_hmac_tfm;
4302 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4303 (u8 *)tconn->net_conf->shared_secret, key_len);
4305 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4310 get_random_bytes(my_challenge, CHALLENGE_LEN);
4312 rv = !conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4316 err = drbd_recv_header(tconn, &pi);
4322 if (pi.cmd != P_AUTH_CHALLENGE) {
4323 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4324 cmdname(pi.cmd), pi.cmd);
4329 if (pi.size > CHALLENGE_LEN * 2) {
4330 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4335 peers_ch = kmalloc(pi.size, GFP_NOIO);
4336 if (peers_ch == NULL) {
4337 conn_err(tconn, "kmalloc of peers_ch failed\n");
4342 err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4348 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4349 response = kmalloc(resp_size, GFP_NOIO);
4350 if (response == NULL) {
4351 conn_err(tconn, "kmalloc of response failed\n");
4356 sg_init_table(&sg, 1);
4357 sg_set_buf(&sg, peers_ch, pi.size);
4359 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4361 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4366 rv = !conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
4370 err = drbd_recv_header(tconn, &pi);
4376 if (pi.cmd != P_AUTH_RESPONSE) {
4377 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4378 cmdname(pi.cmd), pi.cmd);
4383 if (pi.size != resp_size) {
4384 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4389 err = drbd_recv_all_warn(tconn, response , resp_size);
4395 right_response = kmalloc(resp_size, GFP_NOIO);
4396 if (right_response == NULL) {
4397 conn_err(tconn, "kmalloc of right_response failed\n");
4402 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4404 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4406 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4411 rv = !memcmp(response, right_response, resp_size);
4414 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4415 resp_size, tconn->net_conf->cram_hmac_alg);
4422 kfree(right_response);
4428 int drbdd_init(struct drbd_thread *thi)
4430 struct drbd_tconn *tconn = thi->tconn;
4433 conn_info(tconn, "receiver (re)started\n");
4436 h = drbd_connect(tconn);
4438 drbd_disconnect(tconn);
4439 schedule_timeout_interruptible(HZ);
4442 conn_warn(tconn, "Discarding network configuration.\n");
4443 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4448 if (get_net_conf(tconn)) {
4450 put_net_conf(tconn);
4454 drbd_disconnect(tconn);
4456 conn_info(tconn, "receiver terminated\n");
4460 /* ********* acknowledge sender ******** */
4462 static int got_conn_RqSReply(struct drbd_tconn *tconn, enum drbd_packet cmd)
4464 struct p_req_state_reply *p = tconn->meta.rbuf;
4465 int retcode = be32_to_cpu(p->retcode);
4467 if (retcode >= SS_SUCCESS) {
4468 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4470 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4471 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4472 drbd_set_st_err_str(retcode), retcode);
4474 wake_up(&tconn->ping_wait);
4479 static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
4481 struct p_req_state_reply *p = mdev->tconn->meta.rbuf;
4482 int retcode = be32_to_cpu(p->retcode);
4484 if (retcode >= SS_SUCCESS) {
4485 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4487 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4488 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4489 drbd_set_st_err_str(retcode), retcode);
4491 wake_up(&mdev->state_wait);
4496 static int got_Ping(struct drbd_tconn *tconn, enum drbd_packet cmd)
4498 return drbd_send_ping_ack(tconn);
4502 static int got_PingAck(struct drbd_tconn *tconn, enum drbd_packet cmd)
4504 /* restore idle timeout */
4505 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4506 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4507 wake_up(&tconn->ping_wait);
4512 static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
4514 struct p_block_ack *p = mdev->tconn->meta.rbuf;
4515 sector_t sector = be64_to_cpu(p->sector);
4516 int blksize = be32_to_cpu(p->blksize);
4518 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4520 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4522 if (get_ldev(mdev)) {
4523 drbd_rs_complete_io(mdev, sector);
4524 drbd_set_in_sync(mdev, sector, blksize);
4525 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4526 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4529 dec_rs_pending(mdev);
4530 atomic_add(blksize >> 9, &mdev->rs_sect_in);
4536 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4537 struct rb_root *root, const char *func,
4538 enum drbd_req_event what, bool missing_ok)
4540 struct drbd_request *req;
4541 struct bio_and_error m;
4543 spin_lock_irq(&mdev->tconn->req_lock);
4544 req = find_request(mdev, root, id, sector, missing_ok, func);
4545 if (unlikely(!req)) {
4546 spin_unlock_irq(&mdev->tconn->req_lock);
4549 __req_mod(req, what, &m);
4550 spin_unlock_irq(&mdev->tconn->req_lock);
4553 complete_master_bio(mdev, &m);
4557 static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
4559 struct p_block_ack *p = mdev->tconn->meta.rbuf;
4560 sector_t sector = be64_to_cpu(p->sector);
4561 int blksize = be32_to_cpu(p->blksize);
4562 enum drbd_req_event what;
4564 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4566 if (p->block_id == ID_SYNCER) {
4567 drbd_set_in_sync(mdev, sector, blksize);
4568 dec_rs_pending(mdev);
4572 case P_RS_WRITE_ACK:
4573 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4574 what = WRITE_ACKED_BY_PEER_AND_SIS;
4577 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4578 what = WRITE_ACKED_BY_PEER;
4581 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
4582 what = RECV_ACKED_BY_PEER;
4584 case P_DISCARD_WRITE:
4585 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4586 what = DISCARD_WRITE;
4589 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4590 what = POSTPONE_WRITE;
4597 return validate_req_change_req_state(mdev, p->block_id, sector,
4598 &mdev->write_requests, __func__,
4602 static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
4604 struct p_block_ack *p = mdev->tconn->meta.rbuf;
4605 sector_t sector = be64_to_cpu(p->sector);
4606 int size = be32_to_cpu(p->blksize);
4607 bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4608 mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
4611 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4613 if (p->block_id == ID_SYNCER) {
4614 dec_rs_pending(mdev);
4615 drbd_rs_failed_io(mdev, sector, size);
4619 found = validate_req_change_req_state(mdev, p->block_id, sector,
4620 &mdev->write_requests, __func__,
4621 NEG_ACKED, missing_ok);
4623 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4624 The master bio might already be completed, therefore the
4625 request is no longer in the collision hash. */
4626 /* In Protocol B we might already have got a P_RECV_ACK
4627 but then get a P_NEG_ACK afterwards. */
4630 drbd_set_out_of_sync(mdev, sector, size);
4635 static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
4637 struct p_block_ack *p = mdev->tconn->meta.rbuf;
4638 sector_t sector = be64_to_cpu(p->sector);
4640 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4642 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4643 (unsigned long long)sector, be32_to_cpu(p->blksize));
4645 return validate_req_change_req_state(mdev, p->block_id, sector,
4646 &mdev->read_requests, __func__,
4650 static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
4654 struct p_block_ack *p = mdev->tconn->meta.rbuf;
4656 sector = be64_to_cpu(p->sector);
4657 size = be32_to_cpu(p->blksize);
4659 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4661 dec_rs_pending(mdev);
4663 if (get_ldev_if_state(mdev, D_FAILED)) {
4664 drbd_rs_complete_io(mdev, sector);
4666 case P_NEG_RS_DREPLY:
4667 drbd_rs_failed_io(mdev, sector, size);
4681 static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
4683 struct p_barrier_ack *p = mdev->tconn->meta.rbuf;
4685 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4687 if (mdev->state.conn == C_AHEAD &&
4688 atomic_read(&mdev->ap_in_flight) == 0 &&
4689 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4690 mdev->start_resync_timer.expires = jiffies + HZ;
4691 add_timer(&mdev->start_resync_timer);
4697 static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
4699 struct p_block_ack *p = mdev->tconn->meta.rbuf;
4700 struct drbd_work *w;
4704 sector = be64_to_cpu(p->sector);
4705 size = be32_to_cpu(p->blksize);
4707 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4709 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4710 drbd_ov_out_of_sync_found(mdev, sector, size);
4712 ov_out_of_sync_print(mdev);
4714 if (!get_ldev(mdev))
4717 drbd_rs_complete_io(mdev, sector);
4718 dec_rs_pending(mdev);
4722 /* let's advance progress step marks only for every other megabyte */
4723 if ((mdev->ov_left & 0x200) == 0x200)
4724 drbd_advance_rs_marks(mdev, mdev->ov_left);
4726 if (mdev->ov_left == 0) {
4727 w = kmalloc(sizeof(*w), GFP_NOIO);
4729 w->cb = w_ov_finished;
4731 drbd_queue_work_front(&mdev->tconn->data.work, w);
4733 dev_err(DEV, "kmalloc(w) failed.");
4734 ov_out_of_sync_print(mdev);
4735 drbd_resync_finished(mdev);
4742 static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
4747 static int tconn_process_done_ee(struct drbd_tconn *tconn)
4749 struct drbd_conf *mdev;
4750 int i, not_empty = 0;
4753 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4754 flush_signals(current);
4755 idr_for_each_entry(&tconn->volumes, mdev, i) {
4756 if (drbd_process_done_ee(mdev))
4757 return 1; /* error */
4759 set_bit(SIGNAL_ASENDER, &tconn->flags);
4761 spin_lock_irq(&tconn->req_lock);
4762 idr_for_each_entry(&tconn->volumes, mdev, i) {
4763 not_empty = !list_empty(&mdev->done_ee);
4767 spin_unlock_irq(&tconn->req_lock);
4768 } while (not_empty);
4773 struct asender_cmd {
4775 enum mdev_or_conn fa_type; /* first argument's type */
4777 int (*mdev_fn)(struct drbd_conf *mdev, enum drbd_packet cmd);
4778 int (*conn_fn)(struct drbd_tconn *tconn, enum drbd_packet cmd);
4782 static struct asender_cmd asender_tbl[] = {
4783 [P_PING] = { sizeof(struct p_header), CONN, { .conn_fn = got_Ping } },
4784 [P_PING_ACK] = { sizeof(struct p_header), CONN, { .conn_fn = got_PingAck } },
4785 [P_RECV_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4786 [P_WRITE_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4787 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4788 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4789 [P_NEG_ACK] = { sizeof(struct p_block_ack), MDEV, { got_NegAck } },
4790 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), MDEV, { got_NegDReply } },
4791 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), MDEV, { got_NegRSDReply } },
4792 [P_OV_RESULT] = { sizeof(struct p_block_ack), MDEV, { got_OVResult } },
4793 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), MDEV, { got_BarrierAck } },
4794 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), MDEV, { got_RqSReply } },
4795 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), MDEV, { got_IsInSync } },
4796 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), MDEV, { got_skip } },
4797 [P_RS_CANCEL] = { sizeof(struct p_block_ack), MDEV, { got_NegRSDReply } },
4798 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), CONN, {.conn_fn = got_conn_RqSReply}},
4799 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), MDEV, { got_BlockAck } },
4802 int drbd_asender(struct drbd_thread *thi)
4804 struct drbd_tconn *tconn = thi->tconn;
4805 struct p_header *h = tconn->meta.rbuf;
4806 struct asender_cmd *cmd = NULL;
4807 struct packet_info pi;
4811 int expect = sizeof(struct p_header);
4812 int ping_timeout_active = 0;
4814 current->policy = SCHED_RR; /* Make this a realtime task! */
4815 current->rt_priority = 2; /* more important than all other tasks */
4817 while (get_t_state(thi) == RUNNING) {
4818 drbd_thread_current_set_cpu(thi);
4819 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
4820 if (!drbd_send_ping(tconn)) {
4821 conn_err(tconn, "drbd_send_ping has failed\n");
4824 tconn->meta.socket->sk->sk_rcvtimeo =
4825 tconn->net_conf->ping_timeo*HZ/10;
4826 ping_timeout_active = 1;
4829 /* TODO: conditionally cork; it may hurt latency if we cork without
4831 if (!tconn->net_conf->no_cork)
4832 drbd_tcp_cork(tconn->meta.socket);
4833 if (tconn_process_done_ee(tconn)) {
4834 conn_err(tconn, "tconn_process_done_ee() failed\n");
4837 /* but unconditionally uncork unless disabled */
4838 if (!tconn->net_conf->no_cork)
4839 drbd_tcp_uncork(tconn->meta.socket);
4841 /* short circuit, recv_msg would return EINTR anyways. */
4842 if (signal_pending(current))
4845 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4846 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4848 flush_signals(current);
4851 * -EINTR (on meta) we got a signal
4852 * -EAGAIN (on meta) rcvtimeo expired
4853 * -ECONNRESET other side closed the connection
4854 * -ERESTARTSYS (on data) we got a signal
4855 * rv < 0 other than above: unexpected error!
4856 * rv == expected: full header or command
4857 * rv < expected: "woken" by signal during receive
4858 * rv == 0 : "connection shut down by peer"
4860 if (likely(rv > 0)) {
4863 } else if (rv == 0) {
4864 conn_err(tconn, "meta connection shut down by peer.\n");
4866 } else if (rv == -EAGAIN) {
4867 /* If the data socket received something meanwhile,
4868 * that is good enough: peer is still alive. */
4869 if (time_after(tconn->last_received,
4870 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
4872 if (ping_timeout_active) {
4873 conn_err(tconn, "PingAck did not arrive in time.\n");
4876 set_bit(SEND_PING, &tconn->flags);
4878 } else if (rv == -EINTR) {
4881 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
4885 if (received == expect && cmd == NULL) {
4886 if (decode_header(tconn, h, &pi))
4888 cmd = &asender_tbl[pi.cmd];
4889 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd) {
4890 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
4894 expect = cmd->pkt_size;
4895 if (pi.size != expect - sizeof(struct p_header)) {
4896 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
4901 if (received == expect) {
4904 if (cmd->fa_type == CONN) {
4905 rv = cmd->conn_fn(tconn, pi.cmd);
4907 struct drbd_conf *mdev = vnr_to_mdev(tconn, pi.vnr);
4908 rv = cmd->mdev_fn(mdev, pi.cmd);
4914 tconn->last_received = jiffies;
4916 /* the idle_timeout (ping-int)
4917 * has been restored in got_PingAck() */
4918 if (cmd == &asender_tbl[P_PING_ACK])
4919 ping_timeout_active = 0;
4923 expect = sizeof(struct p_header);
4930 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4934 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4936 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4938 conn_info(tconn, "asender terminated\n");