4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <asm/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
47 #include "drbd_protocol.h"
51 #define PRO_FEATURES (FF_TRIM)
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77 * some helper functions to deal with single linked page lists,
78 * page->private being our "next" pointer.
81 /* If at least n pages are linked at head, get n pages off.
82 * Otherwise, don't modify head, and return NULL.
83 * Locking is the responsibility of the caller.
85 static struct page *page_chain_del(struct page **head, int n)
99 tmp = page_chain_next(page);
101 break; /* found sufficient pages */
103 /* insufficient pages, don't use any of them. */
108 /* add end of list marker for the returned list */
109 set_page_private(page, 0);
110 /* actual return value, and adjustment of head */
116 /* may be used outside of locks to find the tail of a (usually short)
117 * "private" page chain, before adding it back to a global chain head
118 * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
123 while ((tmp = page_chain_next(page)))
130 static int page_chain_free(struct page *page)
134 page_chain_for_each_safe(page, tmp) {
141 static void page_chain_add(struct page **head,
142 struct page *chain_first, struct page *chain_last)
146 tmp = page_chain_tail(chain_first, NULL);
147 BUG_ON(tmp != chain_last);
150 /* add chain to head */
151 set_page_private(chain_last, (unsigned long)*head);
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
158 struct page *page = NULL;
159 struct page *tmp = NULL;
162 /* Yes, testing drbd_pp_vacant outside the lock is racy.
163 * So what. It saves a spin_lock. */
164 if (drbd_pp_vacant >= number) {
165 spin_lock(&drbd_pp_lock);
166 page = page_chain_del(&drbd_pp_pool, number);
168 drbd_pp_vacant -= number;
169 spin_unlock(&drbd_pp_lock);
174 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175 * "criss-cross" setup, that might cause write-out on some other DRBD,
176 * which in turn might block on the other node at this very place. */
177 for (i = 0; i < number; i++) {
178 tmp = alloc_page(GFP_TRY);
181 set_page_private(tmp, (unsigned long)page);
188 /* Not enough pages immediately available this time.
189 * No need to jump around here, drbd_alloc_pages will retry this
190 * function "soon". */
192 tmp = page_chain_tail(page, NULL);
193 spin_lock(&drbd_pp_lock);
194 page_chain_add(&drbd_pp_pool, page, tmp);
196 spin_unlock(&drbd_pp_lock);
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202 struct list_head *to_be_freed)
204 struct drbd_peer_request *peer_req, *tmp;
206 /* The EEs are always appended to the end of the list. Since
207 they are sent in order over the wire, they have to finish
208 in order. As soon as we see the first not finished we can
209 stop to examine the list... */
211 list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212 if (drbd_peer_req_has_active_page(peer_req))
214 list_move(&peer_req->w.list, to_be_freed);
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
220 LIST_HEAD(reclaimed);
221 struct drbd_peer_request *peer_req, *t;
223 spin_lock_irq(&device->resource->req_lock);
224 reclaim_finished_net_peer_reqs(device, &reclaimed);
225 spin_unlock_irq(&device->resource->req_lock);
227 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228 drbd_free_net_peer_req(device, peer_req);
232 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233 * @device: DRBD device.
234 * @number: number of pages requested
235 * @retry: whether to retry, if not enough pages are available right now
237 * Tries to allocate number pages, first from our own page pool, then from
239 * Possibly retry until DRBD frees sufficient pages somewhere else.
241 * If this allocation would exceed the max_buffers setting, we throttle
242 * allocation (schedule_timeout) to give the system some room to breathe.
244 * We do not use max-buffers as hard limit, because it could lead to
245 * congestion and further to a distributed deadlock during online-verify or
246 * (checksum based) resync, if the max-buffers, socket buffer sizes and
247 * resync-rate settings are mis-configured.
249 * Returns a page chain linked via page->private.
251 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
254 struct drbd_device *device = peer_device->device;
255 struct page *page = NULL;
261 nc = rcu_dereference(peer_device->connection->net_conf);
262 mxb = nc ? nc->max_buffers : 1000000;
265 if (atomic_read(&device->pp_in_use) < mxb)
266 page = __drbd_alloc_pages(device, number);
268 while (page == NULL) {
269 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
271 drbd_kick_lo_and_reclaim_net(device);
273 if (atomic_read(&device->pp_in_use) < mxb) {
274 page = __drbd_alloc_pages(device, number);
282 if (signal_pending(current)) {
283 drbd_warn(device, "drbd_alloc_pages interrupted!\n");
287 if (schedule_timeout(HZ/10) == 0)
290 finish_wait(&drbd_pp_wait, &wait);
293 atomic_add(number, &device->pp_in_use);
297 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
298 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
299 * Either links the page chain back to the global pool,
300 * or returns all pages to the system. */
301 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
303 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
309 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
310 i = page_chain_free(page);
313 tmp = page_chain_tail(page, &i);
314 spin_lock(&drbd_pp_lock);
315 page_chain_add(&drbd_pp_pool, page, tmp);
317 spin_unlock(&drbd_pp_lock);
319 i = atomic_sub_return(i, a);
321 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
322 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
323 wake_up(&drbd_pp_wait);
327 You need to hold the req_lock:
328 _drbd_wait_ee_list_empty()
330 You must not have the req_lock:
332 drbd_alloc_peer_req()
333 drbd_free_peer_reqs()
335 drbd_finish_peer_reqs()
337 drbd_wait_ee_list_empty()
340 struct drbd_peer_request *
341 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
342 unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
344 struct drbd_device *device = peer_device->device;
345 struct drbd_peer_request *peer_req;
346 struct page *page = NULL;
347 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
349 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
352 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
354 if (!(gfp_mask & __GFP_NOWARN))
355 drbd_err(device, "%s: allocation failed\n", __func__);
359 if (has_payload && data_size) {
360 page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
365 drbd_clear_interval(&peer_req->i);
366 peer_req->i.size = data_size;
367 peer_req->i.sector = sector;
368 peer_req->i.local = false;
369 peer_req->i.waiting = false;
371 peer_req->epoch = NULL;
372 peer_req->peer_device = peer_device;
373 peer_req->pages = page;
374 atomic_set(&peer_req->pending_bios, 0);
377 * The block_id is opaque to the receiver. It is not endianness
378 * converted, and sent back to the sender unchanged.
380 peer_req->block_id = id;
385 mempool_free(peer_req, drbd_ee_mempool);
389 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
392 if (peer_req->flags & EE_HAS_DIGEST)
393 kfree(peer_req->digest);
394 drbd_free_pages(device, peer_req->pages, is_net);
395 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
396 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
397 mempool_free(peer_req, drbd_ee_mempool);
400 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
402 LIST_HEAD(work_list);
403 struct drbd_peer_request *peer_req, *t;
405 int is_net = list == &device->net_ee;
407 spin_lock_irq(&device->resource->req_lock);
408 list_splice_init(list, &work_list);
409 spin_unlock_irq(&device->resource->req_lock);
411 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
412 __drbd_free_peer_req(device, peer_req, is_net);
419 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
421 static int drbd_finish_peer_reqs(struct drbd_device *device)
423 LIST_HEAD(work_list);
424 LIST_HEAD(reclaimed);
425 struct drbd_peer_request *peer_req, *t;
428 spin_lock_irq(&device->resource->req_lock);
429 reclaim_finished_net_peer_reqs(device, &reclaimed);
430 list_splice_init(&device->done_ee, &work_list);
431 spin_unlock_irq(&device->resource->req_lock);
433 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
434 drbd_free_net_peer_req(device, peer_req);
436 /* possible callbacks here:
437 * e_end_block, and e_end_resync_block, e_send_superseded.
438 * all ignore the last argument.
440 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
443 /* list_del not necessary, next/prev members not touched */
444 err2 = peer_req->w.cb(&peer_req->w, !!err);
447 drbd_free_peer_req(device, peer_req);
449 wake_up(&device->ee_wait);
454 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
455 struct list_head *head)
459 /* avoids spin_lock/unlock
460 * and calling prepare_to_wait in the fast path */
461 while (!list_empty(head)) {
462 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
463 spin_unlock_irq(&device->resource->req_lock);
465 finish_wait(&device->ee_wait, &wait);
466 spin_lock_irq(&device->resource->req_lock);
470 static void drbd_wait_ee_list_empty(struct drbd_device *device,
471 struct list_head *head)
473 spin_lock_irq(&device->resource->req_lock);
474 _drbd_wait_ee_list_empty(device, head);
475 spin_unlock_irq(&device->resource->req_lock);
478 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
484 struct msghdr msg = {
485 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
487 return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
490 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
494 rv = drbd_recv_short(connection->data.socket, buf, size, 0);
497 if (rv == -ECONNRESET)
498 drbd_info(connection, "sock was reset by peer\n");
499 else if (rv != -ERESTARTSYS)
500 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
501 } else if (rv == 0) {
502 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
505 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
508 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
513 drbd_info(connection, "sock was shut down by peer\n");
517 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
523 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
527 err = drbd_recv(connection, buf, size);
536 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
540 err = drbd_recv_all(connection, buf, size);
541 if (err && !signal_pending(current))
542 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
547 * On individual connections, the socket buffer size must be set prior to the
548 * listen(2) or connect(2) calls in order to have it take effect.
549 * This is our wrapper to do so.
551 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
554 /* open coded SO_SNDBUF, SO_RCVBUF */
556 sock->sk->sk_sndbuf = snd;
557 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
560 sock->sk->sk_rcvbuf = rcv;
561 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
565 static struct socket *drbd_try_connect(struct drbd_connection *connection)
569 struct sockaddr_in6 src_in6;
570 struct sockaddr_in6 peer_in6;
572 int err, peer_addr_len, my_addr_len;
573 int sndbuf_size, rcvbuf_size, connect_int;
574 int disconnect_on_error = 1;
577 nc = rcu_dereference(connection->net_conf);
582 sndbuf_size = nc->sndbuf_size;
583 rcvbuf_size = nc->rcvbuf_size;
584 connect_int = nc->connect_int;
587 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
588 memcpy(&src_in6, &connection->my_addr, my_addr_len);
590 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
591 src_in6.sin6_port = 0;
593 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
595 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
596 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
598 what = "sock_create_kern";
599 err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
600 SOCK_STREAM, IPPROTO_TCP, &sock);
606 sock->sk->sk_rcvtimeo =
607 sock->sk->sk_sndtimeo = connect_int * HZ;
608 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
610 /* explicitly bind to the configured IP as source IP
611 * for the outgoing connections.
612 * This is needed for multihomed hosts and to be
613 * able to use lo: interfaces for drbd.
614 * Make sure to use 0 as port number, so linux selects
615 * a free one dynamically.
617 what = "bind before connect";
618 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
622 /* connect may fail, peer not yet available.
623 * stay C_WF_CONNECTION, don't go Disconnecting! */
624 disconnect_on_error = 0;
626 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
635 /* timeout, busy, signal pending */
636 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
637 case EINTR: case ERESTARTSYS:
638 /* peer not (yet) available, network problem */
639 case ECONNREFUSED: case ENETUNREACH:
640 case EHOSTDOWN: case EHOSTUNREACH:
641 disconnect_on_error = 0;
644 drbd_err(connection, "%s failed, err = %d\n", what, err);
646 if (disconnect_on_error)
647 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
653 struct accept_wait_data {
654 struct drbd_connection *connection;
655 struct socket *s_listen;
656 struct completion door_bell;
657 void (*original_sk_state_change)(struct sock *sk);
661 static void drbd_incoming_connection(struct sock *sk)
663 struct accept_wait_data *ad = sk->sk_user_data;
664 void (*state_change)(struct sock *sk);
666 state_change = ad->original_sk_state_change;
667 if (sk->sk_state == TCP_ESTABLISHED)
668 complete(&ad->door_bell);
672 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
674 int err, sndbuf_size, rcvbuf_size, my_addr_len;
675 struct sockaddr_in6 my_addr;
676 struct socket *s_listen;
681 nc = rcu_dereference(connection->net_conf);
686 sndbuf_size = nc->sndbuf_size;
687 rcvbuf_size = nc->rcvbuf_size;
690 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
691 memcpy(&my_addr, &connection->my_addr, my_addr_len);
693 what = "sock_create_kern";
694 err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
695 SOCK_STREAM, IPPROTO_TCP, &s_listen);
701 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
702 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
704 what = "bind before listen";
705 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
709 ad->s_listen = s_listen;
710 write_lock_bh(&s_listen->sk->sk_callback_lock);
711 ad->original_sk_state_change = s_listen->sk->sk_state_change;
712 s_listen->sk->sk_state_change = drbd_incoming_connection;
713 s_listen->sk->sk_user_data = ad;
714 write_unlock_bh(&s_listen->sk->sk_callback_lock);
717 err = s_listen->ops->listen(s_listen, 5);
724 sock_release(s_listen);
726 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
727 drbd_err(connection, "%s failed, err = %d\n", what, err);
728 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
735 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
737 write_lock_bh(&sk->sk_callback_lock);
738 sk->sk_state_change = ad->original_sk_state_change;
739 sk->sk_user_data = NULL;
740 write_unlock_bh(&sk->sk_callback_lock);
743 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
745 int timeo, connect_int, err = 0;
746 struct socket *s_estab = NULL;
750 nc = rcu_dereference(connection->net_conf);
755 connect_int = nc->connect_int;
758 timeo = connect_int * HZ;
759 /* 28.5% random jitter */
760 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
762 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
766 err = kernel_accept(ad->s_listen, &s_estab, 0);
768 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
769 drbd_err(connection, "accept failed, err = %d\n", err);
770 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
775 unregister_state_change(s_estab->sk, ad);
780 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
782 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
783 enum drbd_packet cmd)
785 if (!conn_prepare_command(connection, sock))
787 return conn_send_command(connection, sock, cmd, 0, NULL, 0);
790 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
792 unsigned int header_size = drbd_header_size(connection);
793 struct packet_info pi;
796 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
797 if (err != header_size) {
802 err = decode_header(connection, connection->data.rbuf, &pi);
809 * drbd_socket_okay() - Free the socket if its connection is not okay
810 * @sock: pointer to the pointer to the socket.
812 static int drbd_socket_okay(struct socket **sock)
820 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
822 if (rr > 0 || rr == -EAGAIN) {
830 /* Gets called if a connection is established, or if a new minor gets created
832 int drbd_connected(struct drbd_peer_device *peer_device)
834 struct drbd_device *device = peer_device->device;
837 atomic_set(&device->packet_seq, 0);
838 device->peer_seq = 0;
840 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
841 &peer_device->connection->cstate_mutex :
842 &device->own_state_mutex;
844 err = drbd_send_sync_param(peer_device);
846 err = drbd_send_sizes(peer_device, 0, 0);
848 err = drbd_send_uuids(peer_device);
850 err = drbd_send_current_state(peer_device);
851 clear_bit(USE_DEGR_WFC_T, &device->flags);
852 clear_bit(RESIZE_PENDING, &device->flags);
853 atomic_set(&device->ap_in_flight, 0);
854 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
860 * 1 yes, we have a valid connection
861 * 0 oops, did not work out, please try again
862 * -1 peer talks different language,
863 * no point in trying again, please go standalone.
864 * -2 We do not have a network config...
866 static int conn_connect(struct drbd_connection *connection)
868 struct drbd_socket sock, msock;
869 struct drbd_peer_device *peer_device;
871 int vnr, timeout, h, ok;
872 bool discard_my_data;
873 enum drbd_state_rv rv;
874 struct accept_wait_data ad = {
875 .connection = connection,
876 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
879 clear_bit(DISCONNECT_SENT, &connection->flags);
880 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
883 mutex_init(&sock.mutex);
884 sock.sbuf = connection->data.sbuf;
885 sock.rbuf = connection->data.rbuf;
887 mutex_init(&msock.mutex);
888 msock.sbuf = connection->meta.sbuf;
889 msock.rbuf = connection->meta.rbuf;
892 /* Assume that the peer only understands protocol 80 until we know better. */
893 connection->agreed_pro_version = 80;
895 if (prepare_listen_socket(connection, &ad))
901 s = drbd_try_connect(connection);
905 send_first_packet(connection, &sock, P_INITIAL_DATA);
906 } else if (!msock.socket) {
907 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
909 send_first_packet(connection, &msock, P_INITIAL_META);
911 drbd_err(connection, "Logic error in conn_connect()\n");
912 goto out_release_sockets;
916 if (sock.socket && msock.socket) {
918 nc = rcu_dereference(connection->net_conf);
919 timeout = nc->ping_timeo * HZ / 10;
921 schedule_timeout_interruptible(timeout);
922 ok = drbd_socket_okay(&sock.socket);
923 ok = drbd_socket_okay(&msock.socket) && ok;
929 s = drbd_wait_for_connect(connection, &ad);
931 int fp = receive_first_packet(connection, s);
932 drbd_socket_okay(&sock.socket);
933 drbd_socket_okay(&msock.socket);
937 drbd_warn(connection, "initial packet S crossed\n");
938 sock_release(sock.socket);
945 set_bit(RESOLVE_CONFLICTS, &connection->flags);
947 drbd_warn(connection, "initial packet M crossed\n");
948 sock_release(msock.socket);
955 drbd_warn(connection, "Error receiving initial packet\n");
958 if (prandom_u32() & 1)
963 if (connection->cstate <= C_DISCONNECTING)
964 goto out_release_sockets;
965 if (signal_pending(current)) {
966 flush_signals(current);
968 if (get_t_state(&connection->receiver) == EXITING)
969 goto out_release_sockets;
972 ok = drbd_socket_okay(&sock.socket);
973 ok = drbd_socket_okay(&msock.socket) && ok;
977 sock_release(ad.s_listen);
979 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
980 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
982 sock.socket->sk->sk_allocation = GFP_NOIO;
983 msock.socket->sk->sk_allocation = GFP_NOIO;
985 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
986 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
989 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
990 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
991 * first set it to the P_CONNECTION_FEATURES timeout,
992 * which we set to 4x the configured ping_timeout. */
994 nc = rcu_dereference(connection->net_conf);
996 sock.socket->sk->sk_sndtimeo =
997 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
999 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1000 timeout = nc->timeout * HZ / 10;
1001 discard_my_data = nc->discard_my_data;
1004 msock.socket->sk->sk_sndtimeo = timeout;
1006 /* we don't want delays.
1007 * we use TCP_CORK where appropriate, though */
1008 drbd_tcp_nodelay(sock.socket);
1009 drbd_tcp_nodelay(msock.socket);
1011 connection->data.socket = sock.socket;
1012 connection->meta.socket = msock.socket;
1013 connection->last_received = jiffies;
1015 h = drbd_do_features(connection);
1019 if (connection->cram_hmac_tfm) {
1020 /* drbd_request_state(device, NS(conn, WFAuth)); */
1021 switch (drbd_do_auth(connection)) {
1023 drbd_err(connection, "Authentication of peer failed\n");
1026 drbd_err(connection, "Authentication of peer failed, trying again.\n");
1031 connection->data.socket->sk->sk_sndtimeo = timeout;
1032 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1034 if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1037 /* Prevent a race between resync-handshake and
1038 * being promoted to Primary.
1040 * Grab and release the state mutex, so we know that any current
1041 * drbd_set_role() is finished, and any incoming drbd_set_role
1042 * will see the STATE_SENT flag, and wait for it to be cleared.
1044 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1045 mutex_lock(peer_device->device->state_mutex);
1047 set_bit(STATE_SENT, &connection->flags);
1049 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1050 mutex_unlock(peer_device->device->state_mutex);
1053 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1054 struct drbd_device *device = peer_device->device;
1055 kref_get(&device->kref);
1058 if (discard_my_data)
1059 set_bit(DISCARD_MY_DATA, &device->flags);
1061 clear_bit(DISCARD_MY_DATA, &device->flags);
1063 drbd_connected(peer_device);
1064 kref_put(&device->kref, drbd_destroy_device);
1069 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1070 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1071 clear_bit(STATE_SENT, &connection->flags);
1075 drbd_thread_start(&connection->asender);
1077 mutex_lock(&connection->resource->conf_update);
1078 /* The discard_my_data flag is a single-shot modifier to the next
1079 * connection attempt, the handshake of which is now well underway.
1080 * No need for rcu style copying of the whole struct
1081 * just to clear a single value. */
1082 connection->net_conf->discard_my_data = 0;
1083 mutex_unlock(&connection->resource->conf_update);
1087 out_release_sockets:
1089 sock_release(ad.s_listen);
1091 sock_release(sock.socket);
1093 sock_release(msock.socket);
1097 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1099 unsigned int header_size = drbd_header_size(connection);
1101 if (header_size == sizeof(struct p_header100) &&
1102 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1103 struct p_header100 *h = header;
1105 drbd_err(connection, "Header padding is not zero\n");
1108 pi->vnr = be16_to_cpu(h->volume);
1109 pi->cmd = be16_to_cpu(h->command);
1110 pi->size = be32_to_cpu(h->length);
1111 } else if (header_size == sizeof(struct p_header95) &&
1112 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1113 struct p_header95 *h = header;
1114 pi->cmd = be16_to_cpu(h->command);
1115 pi->size = be32_to_cpu(h->length);
1117 } else if (header_size == sizeof(struct p_header80) &&
1118 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1119 struct p_header80 *h = header;
1120 pi->cmd = be16_to_cpu(h->command);
1121 pi->size = be16_to_cpu(h->length);
1124 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1125 be32_to_cpu(*(__be32 *)header),
1126 connection->agreed_pro_version);
1129 pi->data = header + header_size;
1133 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1135 void *buffer = connection->data.rbuf;
1138 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1142 err = decode_header(connection, buffer, pi);
1143 connection->last_received = jiffies;
1148 static void drbd_flush(struct drbd_connection *connection)
1151 struct drbd_peer_device *peer_device;
1154 if (connection->write_ordering >= WO_bdev_flush) {
1156 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1157 struct drbd_device *device = peer_device->device;
1159 if (!get_ldev(device))
1161 kref_get(&device->kref);
1164 rv = blkdev_issue_flush(device->ldev->backing_bdev,
1167 drbd_info(device, "local disk flush failed with status %d\n", rv);
1168 /* would rather check on EOPNOTSUPP, but that is not reliable.
1169 * don't try again for ANY return value != 0
1170 * if (rv == -EOPNOTSUPP) */
1171 drbd_bump_write_ordering(connection, WO_drain_io);
1174 kref_put(&device->kref, drbd_destroy_device);
1185 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1186 * @device: DRBD device.
1187 * @epoch: Epoch object.
1190 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1191 struct drbd_epoch *epoch,
1192 enum epoch_event ev)
1195 struct drbd_epoch *next_epoch;
1196 enum finish_epoch rv = FE_STILL_LIVE;
1198 spin_lock(&connection->epoch_lock);
1202 epoch_size = atomic_read(&epoch->epoch_size);
1204 switch (ev & ~EV_CLEANUP) {
1206 atomic_dec(&epoch->active);
1208 case EV_GOT_BARRIER_NR:
1209 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1211 case EV_BECAME_LAST:
1216 if (epoch_size != 0 &&
1217 atomic_read(&epoch->active) == 0 &&
1218 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1219 if (!(ev & EV_CLEANUP)) {
1220 spin_unlock(&connection->epoch_lock);
1221 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1222 spin_lock(&connection->epoch_lock);
1225 /* FIXME: dec unacked on connection, once we have
1226 * something to count pending connection packets in. */
1227 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1228 dec_unacked(epoch->connection);
1231 if (connection->current_epoch != epoch) {
1232 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1233 list_del(&epoch->list);
1234 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1235 connection->epochs--;
1238 if (rv == FE_STILL_LIVE)
1242 atomic_set(&epoch->epoch_size, 0);
1243 /* atomic_set(&epoch->active, 0); is already zero */
1244 if (rv == FE_STILL_LIVE)
1255 spin_unlock(&connection->epoch_lock);
1261 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1262 * @connection: DRBD connection.
1263 * @wo: Write ordering method to try.
1265 void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
1267 struct disk_conf *dc;
1268 struct drbd_peer_device *peer_device;
1269 enum write_ordering_e pwo;
1271 static char *write_ordering_str[] = {
1273 [WO_drain_io] = "drain",
1274 [WO_bdev_flush] = "flush",
1277 pwo = connection->write_ordering;
1280 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1281 struct drbd_device *device = peer_device->device;
1283 if (!get_ldev_if_state(device, D_ATTACHING))
1285 dc = rcu_dereference(device->ldev->disk_conf);
1287 if (wo == WO_bdev_flush && !dc->disk_flushes)
1289 if (wo == WO_drain_io && !dc->disk_drain)
1294 connection->write_ordering = wo;
1295 if (pwo != connection->write_ordering || wo == WO_bdev_flush)
1296 drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
1300 * drbd_submit_peer_request()
1301 * @device: DRBD device.
1302 * @peer_req: peer request
1303 * @rw: flag field, see bio->bi_rw
1305 * May spread the pages to multiple bios,
1306 * depending on bio_add_page restrictions.
1308 * Returns 0 if all bios have been submitted,
1309 * -ENOMEM if we could not allocate enough bios,
1310 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1311 * single page to an empty bio (which should never happen and likely indicates
1312 * that the lower level IO stack is in some way broken). This has been observed
1313 * on certain Xen deployments.
1315 /* TODO allocate from our own bio_set. */
1316 int drbd_submit_peer_request(struct drbd_device *device,
1317 struct drbd_peer_request *peer_req,
1318 const unsigned rw, const int fault_type)
1320 struct bio *bios = NULL;
1322 struct page *page = peer_req->pages;
1323 sector_t sector = peer_req->i.sector;
1324 unsigned ds = peer_req->i.size;
1325 unsigned n_bios = 0;
1326 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1329 if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1330 /* wait for all pending IO completions, before we start
1331 * zeroing things out. */
1332 conn_wait_active_ee_empty(first_peer_device(device)->connection);
1333 if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1334 sector, ds >> 9, GFP_NOIO))
1335 peer_req->flags |= EE_WAS_ERROR;
1336 drbd_endio_write_sec_final(peer_req);
1340 /* Discards don't have any payload.
1341 * But the scsi layer still expects a bio_vec it can use internally,
1342 * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1343 if (peer_req->flags & EE_IS_TRIM)
1346 /* In most cases, we will only need one bio. But in case the lower
1347 * level restrictions happen to be different at this offset on this
1348 * side than those of the sending peer, we may need to submit the
1349 * request in more than one bio.
1351 * Plain bio_alloc is good enough here, this is no DRBD internally
1352 * generated bio, but a bio allocated on behalf of the peer.
1355 bio = bio_alloc(GFP_NOIO, nr_pages);
1357 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1360 /* > peer_req->i.sector, unless this is the first bio */
1361 bio->bi_iter.bi_sector = sector;
1362 bio->bi_bdev = device->ldev->backing_bdev;
1364 bio->bi_private = peer_req;
1365 bio->bi_end_io = drbd_peer_request_endio;
1367 bio->bi_next = bios;
1371 if (rw & REQ_DISCARD) {
1372 bio->bi_iter.bi_size = ds;
1376 page_chain_for_each(page) {
1377 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1378 if (!bio_add_page(bio, page, len, 0)) {
1379 /* A single page must always be possible!
1380 * But in case it fails anyways,
1381 * we deal with it, and complain (below). */
1382 if (bio->bi_vcnt == 0) {
1384 "bio_add_page failed for len=%u, "
1385 "bi_vcnt=0 (bi_sector=%llu)\n",
1386 len, (uint64_t)bio->bi_iter.bi_sector);
1396 D_ASSERT(device, ds == 0);
1398 D_ASSERT(device, page == NULL);
1400 atomic_set(&peer_req->pending_bios, n_bios);
1403 bios = bios->bi_next;
1404 bio->bi_next = NULL;
1406 drbd_generic_make_request(device, fault_type, bio);
1413 bios = bios->bi_next;
1419 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1420 struct drbd_peer_request *peer_req)
1422 struct drbd_interval *i = &peer_req->i;
1424 drbd_remove_interval(&device->write_requests, i);
1425 drbd_clear_interval(i);
1427 /* Wake up any processes waiting for this peer request to complete. */
1429 wake_up(&device->misc_wait);
1432 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1434 struct drbd_peer_device *peer_device;
1438 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1439 struct drbd_device *device = peer_device->device;
1441 kref_get(&device->kref);
1443 drbd_wait_ee_list_empty(device, &device->active_ee);
1444 kref_put(&device->kref, drbd_destroy_device);
1450 static struct drbd_peer_device *
1451 conn_peer_device(struct drbd_connection *connection, int volume_number)
1453 return idr_find(&connection->peer_devices, volume_number);
1456 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1459 struct p_barrier *p = pi->data;
1460 struct drbd_epoch *epoch;
1462 /* FIXME these are unacked on connection,
1463 * not a specific (peer)device.
1465 connection->current_epoch->barrier_nr = p->barrier;
1466 connection->current_epoch->connection = connection;
1467 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1469 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1470 * the activity log, which means it would not be resynced in case the
1471 * R_PRIMARY crashes now.
1472 * Therefore we must send the barrier_ack after the barrier request was
1474 switch (connection->write_ordering) {
1476 if (rv == FE_RECYCLED)
1479 /* receiver context, in the writeout path of the other node.
1480 * avoid potential distributed deadlock */
1481 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1485 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1490 conn_wait_active_ee_empty(connection);
1491 drbd_flush(connection);
1493 if (atomic_read(&connection->current_epoch->epoch_size)) {
1494 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1501 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
1506 atomic_set(&epoch->epoch_size, 0);
1507 atomic_set(&epoch->active, 0);
1509 spin_lock(&connection->epoch_lock);
1510 if (atomic_read(&connection->current_epoch->epoch_size)) {
1511 list_add(&epoch->list, &connection->current_epoch->list);
1512 connection->current_epoch = epoch;
1513 connection->epochs++;
1515 /* The current_epoch got recycled while we allocated this one... */
1518 spin_unlock(&connection->epoch_lock);
1523 /* used from receive_RSDataReply (recv_resync_read)
1524 * and from receive_Data */
1525 static struct drbd_peer_request *
1526 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1527 struct packet_info *pi) __must_hold(local)
1529 struct drbd_device *device = peer_device->device;
1530 const sector_t capacity = drbd_get_capacity(device->this_bdev);
1531 struct drbd_peer_request *peer_req;
1534 int data_size = pi->size;
1535 void *dig_in = peer_device->connection->int_dig_in;
1536 void *dig_vv = peer_device->connection->int_dig_vv;
1537 unsigned long *data;
1538 struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1541 if (!trim && peer_device->connection->peer_integrity_tfm) {
1542 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1544 * FIXME: Receive the incoming digest into the receive buffer
1545 * here, together with its struct p_data?
1547 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1554 D_ASSERT(peer_device, data_size == 0);
1555 data_size = be32_to_cpu(trim->size);
1558 if (!expect(IS_ALIGNED(data_size, 512)))
1560 /* prepare for larger trim requests. */
1561 if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1564 /* even though we trust out peer,
1565 * we sometimes have to double check. */
1566 if (sector + (data_size>>9) > capacity) {
1567 drbd_err(device, "request from peer beyond end of local disk: "
1568 "capacity: %llus < sector: %llus + size: %u\n",
1569 (unsigned long long)capacity,
1570 (unsigned long long)sector, data_size);
1574 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1575 * "criss-cross" setup, that might cause write-out on some other DRBD,
1576 * which in turn might block on the other node at this very place. */
1577 peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1585 page = peer_req->pages;
1586 page_chain_for_each(page) {
1587 unsigned len = min_t(int, ds, PAGE_SIZE);
1589 err = drbd_recv_all_warn(peer_device->connection, data, len);
1590 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1591 drbd_err(device, "Fault injection: Corrupting data on receive\n");
1592 data[0] = data[0] ^ (unsigned long)-1;
1596 drbd_free_peer_req(device, peer_req);
1603 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1604 if (memcmp(dig_in, dig_vv, dgs)) {
1605 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1606 (unsigned long long)sector, data_size);
1607 drbd_free_peer_req(device, peer_req);
1611 device->recv_cnt += data_size>>9;
1615 /* drbd_drain_block() just takes a data block
1616 * out of the socket input buffer, and discards it.
1618 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1627 page = drbd_alloc_pages(peer_device, 1, 1);
1631 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1633 err = drbd_recv_all_warn(peer_device->connection, data, len);
1639 drbd_free_pages(peer_device->device, page, 0);
1643 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1644 sector_t sector, int data_size)
1646 struct bio_vec bvec;
1647 struct bvec_iter iter;
1649 int dgs, err, expect;
1650 void *dig_in = peer_device->connection->int_dig_in;
1651 void *dig_vv = peer_device->connection->int_dig_vv;
1654 if (peer_device->connection->peer_integrity_tfm) {
1655 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1656 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1662 /* optimistically update recv_cnt. if receiving fails below,
1663 * we disconnect anyways, and counters will be reset. */
1664 peer_device->device->recv_cnt += data_size>>9;
1666 bio = req->master_bio;
1667 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1669 bio_for_each_segment(bvec, bio, iter) {
1670 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1671 expect = min_t(int, data_size, bvec.bv_len);
1672 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1673 kunmap(bvec.bv_page);
1676 data_size -= expect;
1680 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1681 if (memcmp(dig_in, dig_vv, dgs)) {
1682 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1687 D_ASSERT(peer_device->device, data_size == 0);
1692 * e_end_resync_block() is called in asender context via
1693 * drbd_finish_peer_reqs().
1695 static int e_end_resync_block(struct drbd_work *w, int unused)
1697 struct drbd_peer_request *peer_req =
1698 container_of(w, struct drbd_peer_request, w);
1699 struct drbd_peer_device *peer_device = peer_req->peer_device;
1700 struct drbd_device *device = peer_device->device;
1701 sector_t sector = peer_req->i.sector;
1704 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1706 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1707 drbd_set_in_sync(device, sector, peer_req->i.size);
1708 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1710 /* Record failure to sync */
1711 drbd_rs_failed_io(device, sector, peer_req->i.size);
1713 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1715 dec_unacked(device);
1720 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1721 struct packet_info *pi) __releases(local)
1723 struct drbd_device *device = peer_device->device;
1724 struct drbd_peer_request *peer_req;
1726 peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1730 dec_rs_pending(device);
1732 inc_unacked(device);
1733 /* corresponding dec_unacked() in e_end_resync_block()
1734 * respective _drbd_clear_done_ee */
1736 peer_req->w.cb = e_end_resync_block;
1738 spin_lock_irq(&device->resource->req_lock);
1739 list_add(&peer_req->w.list, &device->sync_ee);
1740 spin_unlock_irq(&device->resource->req_lock);
1742 atomic_add(pi->size >> 9, &device->rs_sect_ev);
1743 if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1746 /* don't care for the reason here */
1747 drbd_err(device, "submit failed, triggering re-connect\n");
1748 spin_lock_irq(&device->resource->req_lock);
1749 list_del(&peer_req->w.list);
1750 spin_unlock_irq(&device->resource->req_lock);
1752 drbd_free_peer_req(device, peer_req);
1758 static struct drbd_request *
1759 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1760 sector_t sector, bool missing_ok, const char *func)
1762 struct drbd_request *req;
1764 /* Request object according to our peer */
1765 req = (struct drbd_request *)(unsigned long)id;
1766 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1769 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1770 (unsigned long)id, (unsigned long long)sector);
1775 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1777 struct drbd_peer_device *peer_device;
1778 struct drbd_device *device;
1779 struct drbd_request *req;
1782 struct p_data *p = pi->data;
1784 peer_device = conn_peer_device(connection, pi->vnr);
1787 device = peer_device->device;
1789 sector = be64_to_cpu(p->sector);
1791 spin_lock_irq(&device->resource->req_lock);
1792 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1793 spin_unlock_irq(&device->resource->req_lock);
1797 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1798 * special casing it there for the various failure cases.
1799 * still no race with drbd_fail_pending_reads */
1800 err = recv_dless_read(peer_device, req, sector, pi->size);
1802 req_mod(req, DATA_RECEIVED);
1803 /* else: nothing. handled from drbd_disconnect...
1804 * I don't think we may complete this just yet
1805 * in case we are "on-disconnect: freeze" */
1810 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1812 struct drbd_peer_device *peer_device;
1813 struct drbd_device *device;
1816 struct p_data *p = pi->data;
1818 peer_device = conn_peer_device(connection, pi->vnr);
1821 device = peer_device->device;
1823 sector = be64_to_cpu(p->sector);
1824 D_ASSERT(device, p->block_id == ID_SYNCER);
1826 if (get_ldev(device)) {
1827 /* data is submitted to disk within recv_resync_read.
1828 * corresponding put_ldev done below on error,
1829 * or in drbd_peer_request_endio. */
1830 err = recv_resync_read(peer_device, sector, pi);
1832 if (__ratelimit(&drbd_ratelimit_state))
1833 drbd_err(device, "Can not write resync data to local disk.\n");
1835 err = drbd_drain_block(peer_device, pi->size);
1837 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1840 atomic_add(pi->size >> 9, &device->rs_sect_in);
1845 static void restart_conflicting_writes(struct drbd_device *device,
1846 sector_t sector, int size)
1848 struct drbd_interval *i;
1849 struct drbd_request *req;
1851 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1854 req = container_of(i, struct drbd_request, i);
1855 if (req->rq_state & RQ_LOCAL_PENDING ||
1856 !(req->rq_state & RQ_POSTPONED))
1858 /* as it is RQ_POSTPONED, this will cause it to
1859 * be queued on the retry workqueue. */
1860 __req_mod(req, CONFLICT_RESOLVED, NULL);
1865 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1867 static int e_end_block(struct drbd_work *w, int cancel)
1869 struct drbd_peer_request *peer_req =
1870 container_of(w, struct drbd_peer_request, w);
1871 struct drbd_peer_device *peer_device = peer_req->peer_device;
1872 struct drbd_device *device = peer_device->device;
1873 sector_t sector = peer_req->i.sector;
1876 if (peer_req->flags & EE_SEND_WRITE_ACK) {
1877 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1878 pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1879 device->state.conn <= C_PAUSED_SYNC_T &&
1880 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1881 P_RS_WRITE_ACK : P_WRITE_ACK;
1882 err = drbd_send_ack(peer_device, pcmd, peer_req);
1883 if (pcmd == P_RS_WRITE_ACK)
1884 drbd_set_in_sync(device, sector, peer_req->i.size);
1886 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1887 /* we expect it to be marked out of sync anyways...
1888 * maybe assert this? */
1890 dec_unacked(device);
1892 /* we delete from the conflict detection hash _after_ we sent out the
1893 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1894 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1895 spin_lock_irq(&device->resource->req_lock);
1896 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1897 drbd_remove_epoch_entry_interval(device, peer_req);
1898 if (peer_req->flags & EE_RESTART_REQUESTS)
1899 restart_conflicting_writes(device, sector, peer_req->i.size);
1900 spin_unlock_irq(&device->resource->req_lock);
1902 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1904 drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1909 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1911 struct drbd_peer_request *peer_req =
1912 container_of(w, struct drbd_peer_request, w);
1913 struct drbd_peer_device *peer_device = peer_req->peer_device;
1916 err = drbd_send_ack(peer_device, ack, peer_req);
1917 dec_unacked(peer_device->device);
1922 static int e_send_superseded(struct drbd_work *w, int unused)
1924 return e_send_ack(w, P_SUPERSEDED);
1927 static int e_send_retry_write(struct drbd_work *w, int unused)
1929 struct drbd_peer_request *peer_req =
1930 container_of(w, struct drbd_peer_request, w);
1931 struct drbd_connection *connection = peer_req->peer_device->connection;
1933 return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1934 P_RETRY_WRITE : P_SUPERSEDED);
1937 static bool seq_greater(u32 a, u32 b)
1940 * We assume 32-bit wrap-around here.
1941 * For 24-bit wrap-around, we would have to shift:
1944 return (s32)a - (s32)b > 0;
1947 static u32 seq_max(u32 a, u32 b)
1949 return seq_greater(a, b) ? a : b;
1952 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
1954 struct drbd_device *device = peer_device->device;
1955 unsigned int newest_peer_seq;
1957 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
1958 spin_lock(&device->peer_seq_lock);
1959 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
1960 device->peer_seq = newest_peer_seq;
1961 spin_unlock(&device->peer_seq_lock);
1962 /* wake up only if we actually changed device->peer_seq */
1963 if (peer_seq == newest_peer_seq)
1964 wake_up(&device->seq_wait);
1968 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1970 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1973 /* maybe change sync_ee into interval trees as well? */
1974 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
1976 struct drbd_peer_request *rs_req;
1979 spin_lock_irq(&device->resource->req_lock);
1980 list_for_each_entry(rs_req, &device->sync_ee, w.list) {
1981 if (overlaps(peer_req->i.sector, peer_req->i.size,
1982 rs_req->i.sector, rs_req->i.size)) {
1987 spin_unlock_irq(&device->resource->req_lock);
1992 /* Called from receive_Data.
1993 * Synchronize packets on sock with packets on msock.
1995 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1996 * packet traveling on msock, they are still processed in the order they have
1999 * Note: we don't care for Ack packets overtaking P_DATA packets.
2001 * In case packet_seq is larger than device->peer_seq number, there are
2002 * outstanding packets on the msock. We wait for them to arrive.
2003 * In case we are the logically next packet, we update device->peer_seq
2004 * ourselves. Correctly handles 32bit wrap around.
2006 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2007 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2008 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2009 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2011 * returns 0 if we may process the packet,
2012 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2013 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2015 struct drbd_device *device = peer_device->device;
2020 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2023 spin_lock(&device->peer_seq_lock);
2025 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2026 device->peer_seq = seq_max(device->peer_seq, peer_seq);
2030 if (signal_pending(current)) {
2036 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2042 /* Only need to wait if two_primaries is enabled */
2043 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2044 spin_unlock(&device->peer_seq_lock);
2046 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2048 timeout = schedule_timeout(timeout);
2049 spin_lock(&device->peer_seq_lock);
2052 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2056 spin_unlock(&device->peer_seq_lock);
2057 finish_wait(&device->seq_wait, &wait);
2061 /* see also bio_flags_to_wire()
2062 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2063 * flags and back. We may replicate to other kernel versions. */
2064 static unsigned long wire_flags_to_bio(u32 dpf)
2066 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2067 (dpf & DP_FUA ? REQ_FUA : 0) |
2068 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2069 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2072 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2075 struct drbd_interval *i;
2078 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2079 struct drbd_request *req;
2080 struct bio_and_error m;
2084 req = container_of(i, struct drbd_request, i);
2085 if (!(req->rq_state & RQ_POSTPONED))
2087 req->rq_state &= ~RQ_POSTPONED;
2088 __req_mod(req, NEG_ACKED, &m);
2089 spin_unlock_irq(&device->resource->req_lock);
2091 complete_master_bio(device, &m);
2092 spin_lock_irq(&device->resource->req_lock);
2097 static int handle_write_conflicts(struct drbd_device *device,
2098 struct drbd_peer_request *peer_req)
2100 struct drbd_connection *connection = peer_req->peer_device->connection;
2101 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2102 sector_t sector = peer_req->i.sector;
2103 const unsigned int size = peer_req->i.size;
2104 struct drbd_interval *i;
2109 * Inserting the peer request into the write_requests tree will prevent
2110 * new conflicting local requests from being added.
2112 drbd_insert_interval(&device->write_requests, &peer_req->i);
2115 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2116 if (i == &peer_req->i)
2121 * Our peer has sent a conflicting remote request; this
2122 * should not happen in a two-node setup. Wait for the
2123 * earlier peer request to complete.
2125 err = drbd_wait_misc(device, i);
2131 equal = i->sector == sector && i->size == size;
2132 if (resolve_conflicts) {
2134 * If the peer request is fully contained within the
2135 * overlapping request, it can be considered overwritten
2136 * and thus superseded; otherwise, it will be retried
2137 * once all overlapping requests have completed.
2139 bool superseded = i->sector <= sector && i->sector +
2140 (i->size >> 9) >= sector + (size >> 9);
2143 drbd_alert(device, "Concurrent writes detected: "
2144 "local=%llus +%u, remote=%llus +%u, "
2145 "assuming %s came first\n",
2146 (unsigned long long)i->sector, i->size,
2147 (unsigned long long)sector, size,
2148 superseded ? "local" : "remote");
2150 inc_unacked(device);
2151 peer_req->w.cb = superseded ? e_send_superseded :
2153 list_add_tail(&peer_req->w.list, &device->done_ee);
2154 wake_asender(connection);
2159 struct drbd_request *req =
2160 container_of(i, struct drbd_request, i);
2163 drbd_alert(device, "Concurrent writes detected: "
2164 "local=%llus +%u, remote=%llus +%u\n",
2165 (unsigned long long)i->sector, i->size,
2166 (unsigned long long)sector, size);
2168 if (req->rq_state & RQ_LOCAL_PENDING ||
2169 !(req->rq_state & RQ_POSTPONED)) {
2171 * Wait for the node with the discard flag to
2172 * decide if this request has been superseded
2173 * or needs to be retried.
2174 * Requests that have been superseded will
2175 * disappear from the write_requests tree.
2177 * In addition, wait for the conflicting
2178 * request to finish locally before submitting
2179 * the conflicting peer request.
2181 err = drbd_wait_misc(device, &req->i);
2183 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2184 fail_postponed_requests(device, sector, size);
2190 * Remember to restart the conflicting requests after
2191 * the new peer request has completed.
2193 peer_req->flags |= EE_RESTART_REQUESTS;
2200 drbd_remove_epoch_entry_interval(device, peer_req);
2204 /* mirrored write */
2205 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2207 struct drbd_peer_device *peer_device;
2208 struct drbd_device *device;
2210 struct drbd_peer_request *peer_req;
2211 struct p_data *p = pi->data;
2212 u32 peer_seq = be32_to_cpu(p->seq_num);
2217 peer_device = conn_peer_device(connection, pi->vnr);
2220 device = peer_device->device;
2222 if (!get_ldev(device)) {
2225 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2226 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2227 atomic_inc(&connection->current_epoch->epoch_size);
2228 err2 = drbd_drain_block(peer_device, pi->size);
2235 * Corresponding put_ldev done either below (on various errors), or in
2236 * drbd_peer_request_endio, if we successfully submit the data at the
2237 * end of this function.
2240 sector = be64_to_cpu(p->sector);
2241 peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2247 peer_req->w.cb = e_end_block;
2249 dp_flags = be32_to_cpu(p->dp_flags);
2250 rw |= wire_flags_to_bio(dp_flags);
2251 if (pi->cmd == P_TRIM) {
2252 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2253 peer_req->flags |= EE_IS_TRIM;
2254 if (!blk_queue_discard(q))
2255 peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2256 D_ASSERT(peer_device, peer_req->i.size > 0);
2257 D_ASSERT(peer_device, rw & REQ_DISCARD);
2258 D_ASSERT(peer_device, peer_req->pages == NULL);
2259 } else if (peer_req->pages == NULL) {
2260 D_ASSERT(device, peer_req->i.size == 0);
2261 D_ASSERT(device, dp_flags & DP_FLUSH);
2264 if (dp_flags & DP_MAY_SET_IN_SYNC)
2265 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2267 spin_lock(&connection->epoch_lock);
2268 peer_req->epoch = connection->current_epoch;
2269 atomic_inc(&peer_req->epoch->epoch_size);
2270 atomic_inc(&peer_req->epoch->active);
2271 spin_unlock(&connection->epoch_lock);
2274 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2277 peer_req->flags |= EE_IN_INTERVAL_TREE;
2278 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2280 goto out_interrupted;
2281 spin_lock_irq(&device->resource->req_lock);
2282 err = handle_write_conflicts(device, peer_req);
2284 spin_unlock_irq(&device->resource->req_lock);
2285 if (err == -ENOENT) {
2289 goto out_interrupted;
2292 update_peer_seq(peer_device, peer_seq);
2293 spin_lock_irq(&device->resource->req_lock);
2295 /* if we use the zeroout fallback code, we process synchronously
2296 * and we wait for all pending requests, respectively wait for
2297 * active_ee to become empty in drbd_submit_peer_request();
2298 * better not add ourselves here. */
2299 if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2300 list_add(&peer_req->w.list, &device->active_ee);
2301 spin_unlock_irq(&device->resource->req_lock);
2303 if (device->state.conn == C_SYNC_TARGET)
2304 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2306 if (peer_device->connection->agreed_pro_version < 100) {
2308 switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) {
2310 dp_flags |= DP_SEND_WRITE_ACK;
2313 dp_flags |= DP_SEND_RECEIVE_ACK;
2319 if (dp_flags & DP_SEND_WRITE_ACK) {
2320 peer_req->flags |= EE_SEND_WRITE_ACK;
2321 inc_unacked(device);
2322 /* corresponding dec_unacked() in e_end_block()
2323 * respective _drbd_clear_done_ee */
2326 if (dp_flags & DP_SEND_RECEIVE_ACK) {
2327 /* I really don't like it that the receiver thread
2328 * sends on the msock, but anyways */
2329 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2332 if (device->state.pdsk < D_INCONSISTENT) {
2333 /* In case we have the only disk of the cluster, */
2334 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2335 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2336 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2337 drbd_al_begin_io(device, &peer_req->i, true);
2340 err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2344 /* don't care for the reason here */
2345 drbd_err(device, "submit failed, triggering re-connect\n");
2346 spin_lock_irq(&device->resource->req_lock);
2347 list_del(&peer_req->w.list);
2348 drbd_remove_epoch_entry_interval(device, peer_req);
2349 spin_unlock_irq(&device->resource->req_lock);
2350 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2351 drbd_al_complete_io(device, &peer_req->i);
2354 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2356 drbd_free_peer_req(device, peer_req);
2360 /* We may throttle resync, if the lower device seems to be busy,
2361 * and current sync rate is above c_min_rate.
2363 * To decide whether or not the lower device is busy, we use a scheme similar
2364 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2365 * (more than 64 sectors) of activity we cannot account for with our own resync
2366 * activity, it obviously is "busy".
2368 * The current sync rate used here uses only the most recent two step marks,
2369 * to have a short time average so we can react faster.
2371 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
2373 struct lc_element *tmp;
2374 bool throttle = true;
2376 if (!drbd_rs_c_min_rate_throttle(device))
2379 spin_lock_irq(&device->al_lock);
2380 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2382 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2383 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2385 /* Do not slow down if app IO is already waiting for this extent */
2387 spin_unlock_irq(&device->al_lock);
2392 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2394 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2395 unsigned long db, dt, dbdt;
2396 unsigned int c_min_rate;
2400 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2403 /* feature disabled? */
2404 if (c_min_rate == 0)
2407 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2408 (int)part_stat_read(&disk->part0, sectors[1]) -
2409 atomic_read(&device->rs_sect_ev);
2410 if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
2411 unsigned long rs_left;
2414 device->rs_last_events = curr_events;
2416 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2418 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2420 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2421 rs_left = device->ov_left;
2423 rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2425 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2428 db = device->rs_mark_left[i] - rs_left;
2429 dbdt = Bit2KB(db/dt);
2431 if (dbdt > c_min_rate)
2437 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2439 struct drbd_peer_device *peer_device;
2440 struct drbd_device *device;
2443 struct drbd_peer_request *peer_req;
2444 struct digest_info *di = NULL;
2446 unsigned int fault_type;
2447 struct p_block_req *p = pi->data;
2449 peer_device = conn_peer_device(connection, pi->vnr);
2452 device = peer_device->device;
2453 capacity = drbd_get_capacity(device->this_bdev);
2455 sector = be64_to_cpu(p->sector);
2456 size = be32_to_cpu(p->blksize);
2458 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2459 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2460 (unsigned long long)sector, size);
2463 if (sector + (size>>9) > capacity) {
2464 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2465 (unsigned long long)sector, size);
2469 if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2472 case P_DATA_REQUEST:
2473 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2475 case P_RS_DATA_REQUEST:
2476 case P_CSUM_RS_REQUEST:
2478 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2482 dec_rs_pending(device);
2483 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2488 if (verb && __ratelimit(&drbd_ratelimit_state))
2489 drbd_err(device, "Can not satisfy peer's read request, "
2490 "no local data.\n");
2492 /* drain possibly payload */
2493 return drbd_drain_block(peer_device, pi->size);
2496 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2497 * "criss-cross" setup, that might cause write-out on some other DRBD,
2498 * which in turn might block on the other node at this very place. */
2499 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2500 true /* has real payload */, GFP_NOIO);
2507 case P_DATA_REQUEST:
2508 peer_req->w.cb = w_e_end_data_req;
2509 fault_type = DRBD_FAULT_DT_RD;
2510 /* application IO, don't drbd_rs_begin_io */
2513 case P_RS_DATA_REQUEST:
2514 peer_req->w.cb = w_e_end_rsdata_req;
2515 fault_type = DRBD_FAULT_RS_RD;
2516 /* used in the sector offset progress display */
2517 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2521 case P_CSUM_RS_REQUEST:
2522 fault_type = DRBD_FAULT_RS_RD;
2523 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2527 di->digest_size = pi->size;
2528 di->digest = (((char *)di)+sizeof(struct digest_info));
2530 peer_req->digest = di;
2531 peer_req->flags |= EE_HAS_DIGEST;
2533 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2536 if (pi->cmd == P_CSUM_RS_REQUEST) {
2537 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2538 peer_req->w.cb = w_e_end_csum_rs_req;
2539 /* used in the sector offset progress display */
2540 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2541 } else if (pi->cmd == P_OV_REPLY) {
2542 /* track progress, we may need to throttle */
2543 atomic_add(size >> 9, &device->rs_sect_in);
2544 peer_req->w.cb = w_e_end_ov_reply;
2545 dec_rs_pending(device);
2546 /* drbd_rs_begin_io done when we sent this request,
2547 * but accounting still needs to be done. */
2548 goto submit_for_resync;
2553 if (device->ov_start_sector == ~(sector_t)0 &&
2554 peer_device->connection->agreed_pro_version >= 90) {
2555 unsigned long now = jiffies;
2557 device->ov_start_sector = sector;
2558 device->ov_position = sector;
2559 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2560 device->rs_total = device->ov_left;
2561 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2562 device->rs_mark_left[i] = device->ov_left;
2563 device->rs_mark_time[i] = now;
2565 drbd_info(device, "Online Verify start sector: %llu\n",
2566 (unsigned long long)sector);
2568 peer_req->w.cb = w_e_end_ov_req;
2569 fault_type = DRBD_FAULT_RS_RD;
2576 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2577 * wrt the receiver, but it is not as straightforward as it may seem.
2578 * Various places in the resync start and stop logic assume resync
2579 * requests are processed in order, requeuing this on the worker thread
2580 * introduces a bunch of new code for synchronization between threads.
2582 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2583 * "forever", throttling after drbd_rs_begin_io will lock that extent
2584 * for application writes for the same time. For now, just throttle
2585 * here, where the rest of the code expects the receiver to sleep for
2589 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2590 * this defers syncer requests for some time, before letting at least
2591 * on request through. The resync controller on the receiving side
2592 * will adapt to the incoming rate accordingly.
2594 * We cannot throttle here if remote is Primary/SyncTarget:
2595 * we would also throttle its application reads.
2596 * In that case, throttling is done on the SyncTarget only.
2598 if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
2599 schedule_timeout_uninterruptible(HZ/10);
2600 if (drbd_rs_begin_io(device, sector))
2604 atomic_add(size >> 9, &device->rs_sect_ev);
2607 inc_unacked(device);
2608 spin_lock_irq(&device->resource->req_lock);
2609 list_add_tail(&peer_req->w.list, &device->read_ee);
2610 spin_unlock_irq(&device->resource->req_lock);
2612 if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2615 /* don't care for the reason here */
2616 drbd_err(device, "submit failed, triggering re-connect\n");
2617 spin_lock_irq(&device->resource->req_lock);
2618 list_del(&peer_req->w.list);
2619 spin_unlock_irq(&device->resource->req_lock);
2620 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2624 drbd_free_peer_req(device, peer_req);
2629 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
2631 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2633 struct drbd_device *device = peer_device->device;
2634 int self, peer, rv = -100;
2635 unsigned long ch_self, ch_peer;
2636 enum drbd_after_sb_p after_sb_0p;
2638 self = device->ldev->md.uuid[UI_BITMAP] & 1;
2639 peer = device->p_uuid[UI_BITMAP] & 1;
2641 ch_peer = device->p_uuid[UI_SIZE];
2642 ch_self = device->comm_bm_set;
2645 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2647 switch (after_sb_0p) {
2649 case ASB_DISCARD_SECONDARY:
2650 case ASB_CALL_HELPER:
2652 drbd_err(device, "Configuration error.\n");
2654 case ASB_DISCONNECT:
2656 case ASB_DISCARD_YOUNGER_PRI:
2657 if (self == 0 && peer == 1) {
2661 if (self == 1 && peer == 0) {
2665 /* Else fall through to one of the other strategies... */
2666 case ASB_DISCARD_OLDER_PRI:
2667 if (self == 0 && peer == 1) {
2671 if (self == 1 && peer == 0) {
2675 /* Else fall through to one of the other strategies... */
2676 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2677 "Using discard-least-changes instead\n");
2678 case ASB_DISCARD_ZERO_CHG:
2679 if (ch_peer == 0 && ch_self == 0) {
2680 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2684 if (ch_peer == 0) { rv = 1; break; }
2685 if (ch_self == 0) { rv = -1; break; }
2687 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2689 case ASB_DISCARD_LEAST_CHG:
2690 if (ch_self < ch_peer)
2692 else if (ch_self > ch_peer)
2694 else /* ( ch_self == ch_peer ) */
2695 /* Well, then use something else. */
2696 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2699 case ASB_DISCARD_LOCAL:
2702 case ASB_DISCARD_REMOTE:
2710 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary
2712 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2714 struct drbd_device *device = peer_device->device;
2716 enum drbd_after_sb_p after_sb_1p;
2719 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2721 switch (after_sb_1p) {
2722 case ASB_DISCARD_YOUNGER_PRI:
2723 case ASB_DISCARD_OLDER_PRI:
2724 case ASB_DISCARD_LEAST_CHG:
2725 case ASB_DISCARD_LOCAL:
2726 case ASB_DISCARD_REMOTE:
2727 case ASB_DISCARD_ZERO_CHG:
2728 drbd_err(device, "Configuration error.\n");
2730 case ASB_DISCONNECT:
2733 hg = drbd_asb_recover_0p(peer_device);
2734 if (hg == -1 && device->state.role == R_SECONDARY)
2736 if (hg == 1 && device->state.role == R_PRIMARY)
2740 rv = drbd_asb_recover_0p(peer_device);
2742 case ASB_DISCARD_SECONDARY:
2743 return device->state.role == R_PRIMARY ? 1 : -1;
2744 case ASB_CALL_HELPER:
2745 hg = drbd_asb_recover_0p(peer_device);
2746 if (hg == -1 && device->state.role == R_PRIMARY) {
2747 enum drbd_state_rv rv2;
2749 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2750 * we might be here in C_WF_REPORT_PARAMS which is transient.
2751 * we do not need to wait for the after state change work either. */
2752 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2753 if (rv2 != SS_SUCCESS) {
2754 drbd_khelper(device, "pri-lost-after-sb");
2756 drbd_warn(device, "Successfully gave up primary role.\n");
2767 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
2769 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2771 struct drbd_device *device = peer_device->device;
2773 enum drbd_after_sb_p after_sb_2p;
2776 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2778 switch (after_sb_2p) {
2779 case ASB_DISCARD_YOUNGER_PRI:
2780 case ASB_DISCARD_OLDER_PRI:
2781 case ASB_DISCARD_LEAST_CHG:
2782 case ASB_DISCARD_LOCAL:
2783 case ASB_DISCARD_REMOTE:
2785 case ASB_DISCARD_SECONDARY:
2786 case ASB_DISCARD_ZERO_CHG:
2787 drbd_err(device, "Configuration error.\n");
2790 rv = drbd_asb_recover_0p(peer_device);
2792 case ASB_DISCONNECT:
2794 case ASB_CALL_HELPER:
2795 hg = drbd_asb_recover_0p(peer_device);
2797 enum drbd_state_rv rv2;
2799 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2800 * we might be here in C_WF_REPORT_PARAMS which is transient.
2801 * we do not need to wait for the after state change work either. */
2802 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2803 if (rv2 != SS_SUCCESS) {
2804 drbd_khelper(device, "pri-lost-after-sb");
2806 drbd_warn(device, "Successfully gave up primary role.\n");
2816 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2817 u64 bits, u64 flags)
2820 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2823 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2825 (unsigned long long)uuid[UI_CURRENT],
2826 (unsigned long long)uuid[UI_BITMAP],
2827 (unsigned long long)uuid[UI_HISTORY_START],
2828 (unsigned long long)uuid[UI_HISTORY_END],
2829 (unsigned long long)bits,
2830 (unsigned long long)flags);
2834 100 after split brain try auto recover
2835 2 C_SYNC_SOURCE set BitMap
2836 1 C_SYNC_SOURCE use BitMap
2838 -1 C_SYNC_TARGET use BitMap
2839 -2 C_SYNC_TARGET set BitMap
2840 -100 after split brain, disconnect
2841 -1000 unrelated data
2842 -1091 requires proto 91
2843 -1096 requires proto 96
2845 static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
2850 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2851 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2854 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2858 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2859 peer != UUID_JUST_CREATED)
2863 if (self != UUID_JUST_CREATED &&
2864 (peer == UUID_JUST_CREATED || peer == (u64)0))
2868 int rct, dc; /* roles at crash time */
2870 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2872 if (first_peer_device(device)->connection->agreed_pro_version < 91)
2875 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2876 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2877 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2878 drbd_uuid_move_history(device);
2879 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2880 device->ldev->md.uuid[UI_BITMAP] = 0;
2882 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2883 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2886 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2893 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2895 if (first_peer_device(device)->connection->agreed_pro_version < 91)
2898 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2899 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2900 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2902 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2903 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2904 device->p_uuid[UI_BITMAP] = 0UL;
2906 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2909 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
2916 /* Common power [off|failure] */
2917 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
2918 (device->p_uuid[UI_FLAGS] & 2);
2919 /* lowest bit is set when we were primary,
2920 * next bit (weight 2) is set when peer was primary */
2924 case 0: /* !self_pri && !peer_pri */ return 0;
2925 case 1: /* self_pri && !peer_pri */ return 1;
2926 case 2: /* !self_pri && peer_pri */ return -1;
2927 case 3: /* self_pri && peer_pri */
2928 dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
2934 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2939 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
2941 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2942 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2943 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2944 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
2945 /* The last P_SYNC_UUID did not get though. Undo the last start of
2946 resync as sync source modifications of the peer's UUIDs. */
2948 if (first_peer_device(device)->connection->agreed_pro_version < 91)
2951 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
2952 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
2954 drbd_info(device, "Lost last syncUUID packet, corrected:\n");
2955 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2962 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2963 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2964 peer = device->p_uuid[i] & ~((u64)1);
2970 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2971 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2976 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2978 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2979 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2980 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2981 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2982 /* The last P_SYNC_UUID did not get though. Undo the last start of
2983 resync as sync source modifications of our UUIDs. */
2985 if (first_peer_device(device)->connection->agreed_pro_version < 91)
2988 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
2989 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
2991 drbd_info(device, "Last syncUUID did not get through, corrected:\n");
2992 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2993 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3001 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3002 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3003 self = device->ldev->md.uuid[i] & ~((u64)1);
3009 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3010 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3011 if (self == peer && self != ((u64)0))
3015 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3016 self = device->ldev->md.uuid[i] & ~((u64)1);
3017 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3018 peer = device->p_uuid[j] & ~((u64)1);
3027 /* drbd_sync_handshake() returns the new conn state on success, or
3028 CONN_MASK (-1) on failure.
3030 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3031 enum drbd_role peer_role,
3032 enum drbd_disk_state peer_disk) __must_hold(local)
3034 struct drbd_device *device = peer_device->device;
3035 enum drbd_conns rv = C_MASK;
3036 enum drbd_disk_state mydisk;
3037 struct net_conf *nc;
3038 int hg, rule_nr, rr_conflict, tentative;
3040 mydisk = device->state.disk;
3041 if (mydisk == D_NEGOTIATING)
3042 mydisk = device->new_state_tmp.disk;
3044 drbd_info(device, "drbd_sync_handshake:\n");
3046 spin_lock_irq(&device->ldev->md.uuid_lock);
3047 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3048 drbd_uuid_dump(device, "peer", device->p_uuid,
3049 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3051 hg = drbd_uuid_compare(device, &rule_nr);
3052 spin_unlock_irq(&device->ldev->md.uuid_lock);
3054 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3057 drbd_alert(device, "Unrelated data, aborting!\n");
3061 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3065 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3066 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
3067 int f = (hg == -100) || abs(hg) == 2;
3068 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3071 drbd_info(device, "Becoming sync %s due to disk states.\n",
3072 hg > 0 ? "source" : "target");
3076 drbd_khelper(device, "initial-split-brain");
3079 nc = rcu_dereference(peer_device->connection->net_conf);
3081 if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3082 int pcount = (device->state.role == R_PRIMARY)
3083 + (peer_role == R_PRIMARY);
3084 int forced = (hg == -100);
3088 hg = drbd_asb_recover_0p(peer_device);
3091 hg = drbd_asb_recover_1p(peer_device);
3094 hg = drbd_asb_recover_2p(peer_device);
3097 if (abs(hg) < 100) {
3098 drbd_warn(device, "Split-Brain detected, %d primaries, "
3099 "automatically solved. Sync from %s node\n",
3100 pcount, (hg < 0) ? "peer" : "this");
3102 drbd_warn(device, "Doing a full sync, since"
3103 " UUIDs where ambiguous.\n");
3110 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3112 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3116 drbd_warn(device, "Split-Brain detected, manually solved. "
3117 "Sync from %s node\n",
3118 (hg < 0) ? "peer" : "this");
3120 rr_conflict = nc->rr_conflict;
3121 tentative = nc->tentative;
3125 /* FIXME this log message is not correct if we end up here
3126 * after an attempted attach on a diskless node.
3127 * We just refuse to attach -- well, we drop the "connection"
3128 * to that disk, in a way... */
3129 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3130 drbd_khelper(device, "split-brain");
3134 if (hg > 0 && mydisk <= D_INCONSISTENT) {
3135 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3139 if (hg < 0 && /* by intention we do not use mydisk here. */
3140 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3141 switch (rr_conflict) {
3142 case ASB_CALL_HELPER:
3143 drbd_khelper(device, "pri-lost");
3145 case ASB_DISCONNECT:
3146 drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3149 drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3154 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3156 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3158 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3159 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3160 abs(hg) >= 2 ? "full" : "bit-map based");
3165 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3166 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3167 BM_LOCKED_SET_ALLOWED))
3171 if (hg > 0) { /* become sync source. */
3173 } else if (hg < 0) { /* become sync target */
3177 if (drbd_bm_total_weight(device)) {
3178 drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3179 drbd_bm_total_weight(device));
3186 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3188 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3189 if (peer == ASB_DISCARD_REMOTE)
3190 return ASB_DISCARD_LOCAL;
3192 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3193 if (peer == ASB_DISCARD_LOCAL)
3194 return ASB_DISCARD_REMOTE;
3196 /* everything else is valid if they are equal on both sides. */
3200 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3202 struct p_protocol *p = pi->data;
3203 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3204 int p_proto, p_discard_my_data, p_two_primaries, cf;
3205 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3206 char integrity_alg[SHARED_SECRET_MAX] = "";
3207 struct crypto_hash *peer_integrity_tfm = NULL;
3208 void *int_dig_in = NULL, *int_dig_vv = NULL;
3210 p_proto = be32_to_cpu(p->protocol);
3211 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3212 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3213 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
3214 p_two_primaries = be32_to_cpu(p->two_primaries);
3215 cf = be32_to_cpu(p->conn_flags);
3216 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3218 if (connection->agreed_pro_version >= 87) {
3221 if (pi->size > sizeof(integrity_alg))
3223 err = drbd_recv_all(connection, integrity_alg, pi->size);
3226 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3229 if (pi->cmd != P_PROTOCOL_UPDATE) {
3230 clear_bit(CONN_DRY_RUN, &connection->flags);
3232 if (cf & CF_DRY_RUN)
3233 set_bit(CONN_DRY_RUN, &connection->flags);
3236 nc = rcu_dereference(connection->net_conf);
3238 if (p_proto != nc->wire_protocol) {
3239 drbd_err(connection, "incompatible %s settings\n", "protocol");
3240 goto disconnect_rcu_unlock;
3243 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3244 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3245 goto disconnect_rcu_unlock;
3248 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3249 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3250 goto disconnect_rcu_unlock;
3253 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3254 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3255 goto disconnect_rcu_unlock;
3258 if (p_discard_my_data && nc->discard_my_data) {
3259 drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3260 goto disconnect_rcu_unlock;
3263 if (p_two_primaries != nc->two_primaries) {
3264 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3265 goto disconnect_rcu_unlock;
3268 if (strcmp(integrity_alg, nc->integrity_alg)) {
3269 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3270 goto disconnect_rcu_unlock;
3276 if (integrity_alg[0]) {
3280 * We can only change the peer data integrity algorithm
3281 * here. Changing our own data integrity algorithm
3282 * requires that we send a P_PROTOCOL_UPDATE packet at
3283 * the same time; otherwise, the peer has no way to
3284 * tell between which packets the algorithm should
3288 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3289 if (!peer_integrity_tfm) {
3290 drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3295 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3296 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3297 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3298 if (!(int_dig_in && int_dig_vv)) {
3299 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3304 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3305 if (!new_net_conf) {
3306 drbd_err(connection, "Allocation of new net_conf failed\n");
3310 mutex_lock(&connection->data.mutex);
3311 mutex_lock(&connection->resource->conf_update);
3312 old_net_conf = connection->net_conf;
3313 *new_net_conf = *old_net_conf;
3315 new_net_conf->wire_protocol = p_proto;
3316 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3317 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3318 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3319 new_net_conf->two_primaries = p_two_primaries;
3321 rcu_assign_pointer(connection->net_conf, new_net_conf);
3322 mutex_unlock(&connection->resource->conf_update);
3323 mutex_unlock(&connection->data.mutex);
3325 crypto_free_hash(connection->peer_integrity_tfm);
3326 kfree(connection->int_dig_in);
3327 kfree(connection->int_dig_vv);
3328 connection->peer_integrity_tfm = peer_integrity_tfm;
3329 connection->int_dig_in = int_dig_in;
3330 connection->int_dig_vv = int_dig_vv;
3332 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3333 drbd_info(connection, "peer data-integrity-alg: %s\n",
3334 integrity_alg[0] ? integrity_alg : "(none)");
3337 kfree(old_net_conf);
3340 disconnect_rcu_unlock:
3343 crypto_free_hash(peer_integrity_tfm);
3346 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3351 * input: alg name, feature name
3352 * return: NULL (alg name was "")
3353 * ERR_PTR(error) if something goes wrong
3354 * or the crypto hash ptr, if it worked out ok. */
3356 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3357 const char *alg, const char *name)
3359 struct crypto_hash *tfm;
3364 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3366 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3367 alg, name, PTR_ERR(tfm));
3373 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3375 void *buffer = connection->data.rbuf;
3376 int size = pi->size;
3379 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3380 s = drbd_recv(connection, buffer, s);
3394 * config_unknown_volume - device configuration command for unknown volume
3396 * When a device is added to an existing connection, the node on which the
3397 * device is added first will send configuration commands to its peer but the
3398 * peer will not know about the device yet. It will warn and ignore these
3399 * commands. Once the device is added on the second node, the second node will
3400 * send the same device configuration commands, but in the other direction.
3402 * (We can also end up here if drbd is misconfigured.)
3404 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3406 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3407 cmdname(pi->cmd), pi->vnr);
3408 return ignore_remaining_packet(connection, pi);
3411 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3413 struct drbd_peer_device *peer_device;
3414 struct drbd_device *device;
3415 struct p_rs_param_95 *p;
3416 unsigned int header_size, data_size, exp_max_sz;
3417 struct crypto_hash *verify_tfm = NULL;
3418 struct crypto_hash *csums_tfm = NULL;
3419 struct net_conf *old_net_conf, *new_net_conf = NULL;
3420 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3421 const int apv = connection->agreed_pro_version;
3422 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3426 peer_device = conn_peer_device(connection, pi->vnr);
3428 return config_unknown_volume(connection, pi);
3429 device = peer_device->device;
3431 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3432 : apv == 88 ? sizeof(struct p_rs_param)
3434 : apv <= 94 ? sizeof(struct p_rs_param_89)
3435 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3437 if (pi->size > exp_max_sz) {
3438 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3439 pi->size, exp_max_sz);
3444 header_size = sizeof(struct p_rs_param);
3445 data_size = pi->size - header_size;
3446 } else if (apv <= 94) {
3447 header_size = sizeof(struct p_rs_param_89);
3448 data_size = pi->size - header_size;
3449 D_ASSERT(device, data_size == 0);
3451 header_size = sizeof(struct p_rs_param_95);
3452 data_size = pi->size - header_size;
3453 D_ASSERT(device, data_size == 0);
3456 /* initialize verify_alg and csums_alg */
3458 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3460 err = drbd_recv_all(peer_device->connection, p, header_size);
3464 mutex_lock(&connection->resource->conf_update);
3465 old_net_conf = peer_device->connection->net_conf;
3466 if (get_ldev(device)) {
3467 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3468 if (!new_disk_conf) {
3470 mutex_unlock(&connection->resource->conf_update);
3471 drbd_err(device, "Allocation of new disk_conf failed\n");
3475 old_disk_conf = device->ldev->disk_conf;
3476 *new_disk_conf = *old_disk_conf;
3478 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3483 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3484 drbd_err(device, "verify-alg of wrong size, "
3485 "peer wants %u, accepting only up to %u byte\n",
3486 data_size, SHARED_SECRET_MAX);
3491 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3494 /* we expect NUL terminated string */
3495 /* but just in case someone tries to be evil */
3496 D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3497 p->verify_alg[data_size-1] = 0;
3499 } else /* apv >= 89 */ {
3500 /* we still expect NUL terminated strings */
3501 /* but just in case someone tries to be evil */
3502 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3503 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3504 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3505 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3508 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3509 if (device->state.conn == C_WF_REPORT_PARAMS) {
3510 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3511 old_net_conf->verify_alg, p->verify_alg);
3514 verify_tfm = drbd_crypto_alloc_digest_safe(device,
3515 p->verify_alg, "verify-alg");
3516 if (IS_ERR(verify_tfm)) {
3522 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3523 if (device->state.conn == C_WF_REPORT_PARAMS) {
3524 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3525 old_net_conf->csums_alg, p->csums_alg);
3528 csums_tfm = drbd_crypto_alloc_digest_safe(device,
3529 p->csums_alg, "csums-alg");
3530 if (IS_ERR(csums_tfm)) {
3536 if (apv > 94 && new_disk_conf) {
3537 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3538 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3539 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3540 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3542 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3543 if (fifo_size != device->rs_plan_s->size) {
3544 new_plan = fifo_alloc(fifo_size);
3546 drbd_err(device, "kmalloc of fifo_buffer failed");
3553 if (verify_tfm || csums_tfm) {
3554 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3555 if (!new_net_conf) {
3556 drbd_err(device, "Allocation of new net_conf failed\n");
3560 *new_net_conf = *old_net_conf;
3563 strcpy(new_net_conf->verify_alg, p->verify_alg);
3564 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3565 crypto_free_hash(peer_device->connection->verify_tfm);
3566 peer_device->connection->verify_tfm = verify_tfm;
3567 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3570 strcpy(new_net_conf->csums_alg, p->csums_alg);
3571 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3572 crypto_free_hash(peer_device->connection->csums_tfm);
3573 peer_device->connection->csums_tfm = csums_tfm;
3574 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3576 rcu_assign_pointer(connection->net_conf, new_net_conf);
3580 if (new_disk_conf) {
3581 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3586 old_plan = device->rs_plan_s;
3587 rcu_assign_pointer(device->rs_plan_s, new_plan);
3590 mutex_unlock(&connection->resource->conf_update);
3593 kfree(old_net_conf);
3594 kfree(old_disk_conf);
3600 if (new_disk_conf) {
3602 kfree(new_disk_conf);
3604 mutex_unlock(&connection->resource->conf_update);
3609 if (new_disk_conf) {
3611 kfree(new_disk_conf);
3613 mutex_unlock(&connection->resource->conf_update);
3614 /* just for completeness: actually not needed,
3615 * as this is not reached if csums_tfm was ok. */
3616 crypto_free_hash(csums_tfm);
3617 /* but free the verify_tfm again, if csums_tfm did not work out */
3618 crypto_free_hash(verify_tfm);
3619 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3623 /* warn if the arguments differ by more than 12.5% */
3624 static void warn_if_differ_considerably(struct drbd_device *device,
3625 const char *s, sector_t a, sector_t b)
3628 if (a == 0 || b == 0)
3630 d = (a > b) ? (a - b) : (b - a);
3631 if (d > (a>>3) || d > (b>>3))
3632 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3633 (unsigned long long)a, (unsigned long long)b);
3636 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3638 struct drbd_peer_device *peer_device;
3639 struct drbd_device *device;
3640 struct p_sizes *p = pi->data;
3641 enum determine_dev_size dd = DS_UNCHANGED;
3642 sector_t p_size, p_usize, my_usize;
3643 int ldsc = 0; /* local disk size changed */
3644 enum dds_flags ddsf;
3646 peer_device = conn_peer_device(connection, pi->vnr);
3648 return config_unknown_volume(connection, pi);
3649 device = peer_device->device;
3651 p_size = be64_to_cpu(p->d_size);
3652 p_usize = be64_to_cpu(p->u_size);
3654 /* just store the peer's disk size for now.
3655 * we still need to figure out whether we accept that. */
3656 device->p_size = p_size;
3658 if (get_ldev(device)) {
3660 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3663 warn_if_differ_considerably(device, "lower level device sizes",
3664 p_size, drbd_get_max_capacity(device->ldev));
3665 warn_if_differ_considerably(device, "user requested size",
3668 /* if this is the first connect, or an otherwise expected
3669 * param exchange, choose the minimum */
3670 if (device->state.conn == C_WF_REPORT_PARAMS)
3671 p_usize = min_not_zero(my_usize, p_usize);
3673 /* Never shrink a device with usable data during connect.
3674 But allow online shrinking if we are connected. */
3675 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3676 drbd_get_capacity(device->this_bdev) &&
3677 device->state.disk >= D_OUTDATED &&
3678 device->state.conn < C_CONNECTED) {
3679 drbd_err(device, "The peer's disk size is too small!\n");
3680 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3685 if (my_usize != p_usize) {
3686 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3688 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3689 if (!new_disk_conf) {
3690 drbd_err(device, "Allocation of new disk_conf failed\n");
3695 mutex_lock(&connection->resource->conf_update);
3696 old_disk_conf = device->ldev->disk_conf;
3697 *new_disk_conf = *old_disk_conf;
3698 new_disk_conf->disk_size = p_usize;
3700 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3701 mutex_unlock(&connection->resource->conf_update);
3703 kfree(old_disk_conf);
3705 drbd_info(device, "Peer sets u_size to %lu sectors\n",
3706 (unsigned long)my_usize);
3712 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3713 drbd_reconsider_max_bio_size(device);
3714 /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3715 In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3716 drbd_reconsider_max_bio_size(), we can be sure that after
3717 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3719 ddsf = be16_to_cpu(p->dds_flags);
3720 if (get_ldev(device)) {
3721 dd = drbd_determine_dev_size(device, ddsf, NULL);
3725 drbd_md_sync(device);
3727 /* I am diskless, need to accept the peer's size. */
3728 drbd_set_my_capacity(device, p_size);
3731 if (get_ldev(device)) {
3732 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3733 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3740 if (device->state.conn > C_WF_REPORT_PARAMS) {
3741 if (be64_to_cpu(p->c_size) !=
3742 drbd_get_capacity(device->this_bdev) || ldsc) {
3743 /* we have different sizes, probably peer
3744 * needs to know my new size... */
3745 drbd_send_sizes(peer_device, 0, ddsf);
3747 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3748 (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3749 if (device->state.pdsk >= D_INCONSISTENT &&
3750 device->state.disk >= D_INCONSISTENT) {
3751 if (ddsf & DDSF_NO_RESYNC)
3752 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3754 resync_after_online_grow(device);
3756 set_bit(RESYNC_AFTER_NEG, &device->flags);
3763 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3765 struct drbd_peer_device *peer_device;
3766 struct drbd_device *device;
3767 struct p_uuids *p = pi->data;
3769 int i, updated_uuids = 0;
3771 peer_device = conn_peer_device(connection, pi->vnr);
3773 return config_unknown_volume(connection, pi);
3774 device = peer_device->device;
3776 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3778 drbd_err(device, "kmalloc of p_uuid failed\n");
3782 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3783 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3785 kfree(device->p_uuid);
3786 device->p_uuid = p_uuid;
3788 if (device->state.conn < C_CONNECTED &&
3789 device->state.disk < D_INCONSISTENT &&
3790 device->state.role == R_PRIMARY &&
3791 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3792 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3793 (unsigned long long)device->ed_uuid);
3794 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3798 if (get_ldev(device)) {
3799 int skip_initial_sync =
3800 device->state.conn == C_CONNECTED &&
3801 peer_device->connection->agreed_pro_version >= 90 &&
3802 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3803 (p_uuid[UI_FLAGS] & 8);
3804 if (skip_initial_sync) {
3805 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3806 drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3807 "clear_n_write from receive_uuids",
3808 BM_LOCKED_TEST_ALLOWED);
3809 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3810 _drbd_uuid_set(device, UI_BITMAP, 0);
3811 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3813 drbd_md_sync(device);
3817 } else if (device->state.disk < D_INCONSISTENT &&
3818 device->state.role == R_PRIMARY) {
3819 /* I am a diskless primary, the peer just created a new current UUID
3821 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3824 /* Before we test for the disk state, we should wait until an eventually
3825 ongoing cluster wide state change is finished. That is important if
3826 we are primary and are detaching from our disk. We need to see the
3827 new disk state... */
3828 mutex_lock(device->state_mutex);
3829 mutex_unlock(device->state_mutex);
3830 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3831 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3834 drbd_print_uuids(device, "receiver updated UUIDs to");
3840 * convert_state() - Converts the peer's view of the cluster state to our point of view
3841 * @ps: The state as seen by the peer.
3843 static union drbd_state convert_state(union drbd_state ps)
3845 union drbd_state ms;
3847 static enum drbd_conns c_tab[] = {
3848 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3849 [C_CONNECTED] = C_CONNECTED,
3851 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3852 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3853 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3854 [C_VERIFY_S] = C_VERIFY_T,
3860 ms.conn = c_tab[ps.conn];
3865 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3870 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3872 struct drbd_peer_device *peer_device;
3873 struct drbd_device *device;
3874 struct p_req_state *p = pi->data;
3875 union drbd_state mask, val;
3876 enum drbd_state_rv rv;
3878 peer_device = conn_peer_device(connection, pi->vnr);
3881 device = peer_device->device;
3883 mask.i = be32_to_cpu(p->mask);
3884 val.i = be32_to_cpu(p->val);
3886 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3887 mutex_is_locked(device->state_mutex)) {
3888 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3892 mask = convert_state(mask);
3893 val = convert_state(val);
3895 rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3896 drbd_send_sr_reply(peer_device, rv);
3898 drbd_md_sync(device);
3903 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
3905 struct p_req_state *p = pi->data;
3906 union drbd_state mask, val;
3907 enum drbd_state_rv rv;
3909 mask.i = be32_to_cpu(p->mask);
3910 val.i = be32_to_cpu(p->val);
3912 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
3913 mutex_is_locked(&connection->cstate_mutex)) {
3914 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
3918 mask = convert_state(mask);
3919 val = convert_state(val);
3921 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3922 conn_send_sr_reply(connection, rv);
3927 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
3929 struct drbd_peer_device *peer_device;
3930 struct drbd_device *device;
3931 struct p_state *p = pi->data;
3932 union drbd_state os, ns, peer_state;
3933 enum drbd_disk_state real_peer_disk;
3934 enum chg_state_flags cs_flags;
3937 peer_device = conn_peer_device(connection, pi->vnr);
3939 return config_unknown_volume(connection, pi);
3940 device = peer_device->device;
3942 peer_state.i = be32_to_cpu(p->state);
3944 real_peer_disk = peer_state.disk;
3945 if (peer_state.disk == D_NEGOTIATING) {
3946 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3947 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3950 spin_lock_irq(&device->resource->req_lock);
3952 os = ns = drbd_read_state(device);
3953 spin_unlock_irq(&device->resource->req_lock);
3955 /* If some other part of the code (asender thread, timeout)
3956 * already decided to close the connection again,
3957 * we must not "re-establish" it here. */
3958 if (os.conn <= C_TEAR_DOWN)
3961 /* If this is the "end of sync" confirmation, usually the peer disk
3962 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3963 * set) resync started in PausedSyncT, or if the timing of pause-/
3964 * unpause-sync events has been "just right", the peer disk may
3965 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3967 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3968 real_peer_disk == D_UP_TO_DATE &&
3969 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3970 /* If we are (becoming) SyncSource, but peer is still in sync
3971 * preparation, ignore its uptodate-ness to avoid flapping, it
3972 * will change to inconsistent once the peer reaches active
3974 * It may have changed syncer-paused flags, however, so we
3975 * cannot ignore this completely. */
3976 if (peer_state.conn > C_CONNECTED &&
3977 peer_state.conn < C_SYNC_SOURCE)
3978 real_peer_disk = D_INCONSISTENT;
3980 /* if peer_state changes to connected at the same time,
3981 * it explicitly notifies us that it finished resync.
3982 * Maybe we should finish it up, too? */
3983 else if (os.conn >= C_SYNC_SOURCE &&
3984 peer_state.conn == C_CONNECTED) {
3985 if (drbd_bm_total_weight(device) <= device->rs_failed)
3986 drbd_resync_finished(device);
3991 /* explicit verify finished notification, stop sector reached. */
3992 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3993 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3994 ov_out_of_sync_print(device);
3995 drbd_resync_finished(device);
3999 /* peer says his disk is inconsistent, while we think it is uptodate,
4000 * and this happens while the peer still thinks we have a sync going on,
4001 * but we think we are already done with the sync.
4002 * We ignore this to avoid flapping pdsk.
4003 * This should not happen, if the peer is a recent version of drbd. */
4004 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4005 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4006 real_peer_disk = D_UP_TO_DATE;
4008 if (ns.conn == C_WF_REPORT_PARAMS)
4009 ns.conn = C_CONNECTED;
4011 if (peer_state.conn == C_AHEAD)
4014 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4015 get_ldev_if_state(device, D_NEGOTIATING)) {
4016 int cr; /* consider resync */
4018 /* if we established a new connection */
4019 cr = (os.conn < C_CONNECTED);
4020 /* if we had an established connection
4021 * and one of the nodes newly attaches a disk */
4022 cr |= (os.conn == C_CONNECTED &&
4023 (peer_state.disk == D_NEGOTIATING ||
4024 os.disk == D_NEGOTIATING));
4025 /* if we have both been inconsistent, and the peer has been
4026 * forced to be UpToDate with --overwrite-data */
4027 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4028 /* if we had been plain connected, and the admin requested to
4029 * start a sync by "invalidate" or "invalidate-remote" */
4030 cr |= (os.conn == C_CONNECTED &&
4031 (peer_state.conn >= C_STARTING_SYNC_S &&
4032 peer_state.conn <= C_WF_BITMAP_T));
4035 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4038 if (ns.conn == C_MASK) {
4039 ns.conn = C_CONNECTED;
4040 if (device->state.disk == D_NEGOTIATING) {
4041 drbd_force_state(device, NS(disk, D_FAILED));
4042 } else if (peer_state.disk == D_NEGOTIATING) {
4043 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4044 peer_state.disk = D_DISKLESS;
4045 real_peer_disk = D_DISKLESS;
4047 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4049 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4050 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4056 spin_lock_irq(&device->resource->req_lock);
4057 if (os.i != drbd_read_state(device).i)
4059 clear_bit(CONSIDER_RESYNC, &device->flags);
4060 ns.peer = peer_state.role;
4061 ns.pdsk = real_peer_disk;
4062 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4063 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4064 ns.disk = device->new_state_tmp.disk;
4065 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4066 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4067 test_bit(NEW_CUR_UUID, &device->flags)) {
4068 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4069 for temporal network outages! */
4070 spin_unlock_irq(&device->resource->req_lock);
4071 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4072 tl_clear(peer_device->connection);
4073 drbd_uuid_new_current(device);
4074 clear_bit(NEW_CUR_UUID, &device->flags);
4075 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4078 rv = _drbd_set_state(device, ns, cs_flags, NULL);
4079 ns = drbd_read_state(device);
4080 spin_unlock_irq(&device->resource->req_lock);
4082 if (rv < SS_SUCCESS) {
4083 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4087 if (os.conn > C_WF_REPORT_PARAMS) {
4088 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4089 peer_state.disk != D_NEGOTIATING ) {
4090 /* we want resync, peer has not yet decided to sync... */
4091 /* Nowadays only used when forcing a node into primary role and
4092 setting its disk to UpToDate with that */
4093 drbd_send_uuids(peer_device);
4094 drbd_send_current_state(peer_device);
4098 clear_bit(DISCARD_MY_DATA, &device->flags);
4100 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4105 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4107 struct drbd_peer_device *peer_device;
4108 struct drbd_device *device;
4109 struct p_rs_uuid *p = pi->data;
4111 peer_device = conn_peer_device(connection, pi->vnr);
4114 device = peer_device->device;
4116 wait_event(device->misc_wait,
4117 device->state.conn == C_WF_SYNC_UUID ||
4118 device->state.conn == C_BEHIND ||
4119 device->state.conn < C_CONNECTED ||
4120 device->state.disk < D_NEGOTIATING);
4122 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
4124 /* Here the _drbd_uuid_ functions are right, current should
4125 _not_ be rotated into the history */
4126 if (get_ldev_if_state(device, D_NEGOTIATING)) {
4127 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4128 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4130 drbd_print_uuids(device, "updated sync uuid");
4131 drbd_start_resync(device, C_SYNC_TARGET);
4135 drbd_err(device, "Ignoring SyncUUID packet!\n");
4141 * receive_bitmap_plain
4143 * Return 0 when done, 1 when another iteration is needed, and a negative error
4144 * code upon failure.
4147 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4148 unsigned long *p, struct bm_xfer_ctx *c)
4150 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4151 drbd_header_size(peer_device->connection);
4152 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4153 c->bm_words - c->word_offset);
4154 unsigned int want = num_words * sizeof(*p);
4158 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4163 err = drbd_recv_all(peer_device->connection, p, want);
4167 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4169 c->word_offset += num_words;
4170 c->bit_offset = c->word_offset * BITS_PER_LONG;
4171 if (c->bit_offset > c->bm_bits)
4172 c->bit_offset = c->bm_bits;
4177 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4179 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4182 static int dcbp_get_start(struct p_compressed_bm *p)
4184 return (p->encoding & 0x80) != 0;
4187 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4189 return (p->encoding >> 4) & 0x7;
4195 * Return 0 when done, 1 when another iteration is needed, and a negative error
4196 * code upon failure.
4199 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4200 struct p_compressed_bm *p,
4201 struct bm_xfer_ctx *c,
4204 struct bitstream bs;
4208 unsigned long s = c->bit_offset;
4210 int toggle = dcbp_get_start(p);
4214 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4216 bits = bitstream_get_bits(&bs, &look_ahead, 64);
4220 for (have = bits; have > 0; s += rl, toggle = !toggle) {
4221 bits = vli_decode_bits(&rl, look_ahead);
4227 if (e >= c->bm_bits) {
4228 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4231 _drbd_bm_set_bits(peer_device->device, s, e);
4235 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4236 have, bits, look_ahead,
4237 (unsigned int)(bs.cur.b - p->code),
4238 (unsigned int)bs.buf_len);
4241 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4242 if (likely(bits < 64))
4243 look_ahead >>= bits;
4248 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4251 look_ahead |= tmp << have;
4256 bm_xfer_ctx_bit_to_word_offset(c);
4258 return (s != c->bm_bits);
4264 * Return 0 when done, 1 when another iteration is needed, and a negative error
4265 * code upon failure.
4268 decode_bitmap_c(struct drbd_peer_device *peer_device,
4269 struct p_compressed_bm *p,
4270 struct bm_xfer_ctx *c,
4273 if (dcbp_get_code(p) == RLE_VLI_Bits)
4274 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4276 /* other variants had been implemented for evaluation,
4277 * but have been dropped as this one turned out to be "best"
4278 * during all our tests. */
4280 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4281 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4285 void INFO_bm_xfer_stats(struct drbd_device *device,
4286 const char *direction, struct bm_xfer_ctx *c)
4288 /* what would it take to transfer it "plaintext" */
4289 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4290 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4291 unsigned int plain =
4292 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4293 c->bm_words * sizeof(unsigned long);
4294 unsigned int total = c->bytes[0] + c->bytes[1];
4297 /* total can not be zero. but just in case: */
4301 /* don't report if not compressed */
4305 /* total < plain. check for overflow, still */
4306 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4307 : (1000 * total / plain);
4313 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4314 "total %u; compression: %u.%u%%\n",
4316 c->bytes[1], c->packets[1],
4317 c->bytes[0], c->packets[0],
4318 total, r/10, r % 10);
4321 /* Since we are processing the bitfield from lower addresses to higher,
4322 it does not matter if the process it in 32 bit chunks or 64 bit
4323 chunks as long as it is little endian. (Understand it as byte stream,
4324 beginning with the lowest byte...) If we would use big endian
4325 we would need to process it from the highest address to the lowest,
4326 in order to be agnostic to the 32 vs 64 bits issue.
4328 returns 0 on failure, 1 if we successfully received it. */
4329 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4331 struct drbd_peer_device *peer_device;
4332 struct drbd_device *device;
4333 struct bm_xfer_ctx c;
4336 peer_device = conn_peer_device(connection, pi->vnr);
4339 device = peer_device->device;
4341 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4342 /* you are supposed to send additional out-of-sync information
4343 * if you actually set bits during this phase */
4345 c = (struct bm_xfer_ctx) {
4346 .bm_bits = drbd_bm_bits(device),
4347 .bm_words = drbd_bm_words(device),
4351 if (pi->cmd == P_BITMAP)
4352 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4353 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4354 /* MAYBE: sanity check that we speak proto >= 90,
4355 * and the feature is enabled! */
4356 struct p_compressed_bm *p = pi->data;
4358 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4359 drbd_err(device, "ReportCBitmap packet too large\n");
4363 if (pi->size <= sizeof(*p)) {
4364 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4368 err = drbd_recv_all(peer_device->connection, p, pi->size);
4371 err = decode_bitmap_c(peer_device, p, &c, pi->size);
4373 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4378 c.packets[pi->cmd == P_BITMAP]++;
4379 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4386 err = drbd_recv_header(peer_device->connection, pi);
4391 INFO_bm_xfer_stats(device, "receive", &c);
4393 if (device->state.conn == C_WF_BITMAP_T) {
4394 enum drbd_state_rv rv;
4396 err = drbd_send_bitmap(device);
4399 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4400 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4401 D_ASSERT(device, rv == SS_SUCCESS);
4402 } else if (device->state.conn != C_WF_BITMAP_S) {
4403 /* admin may have requested C_DISCONNECTING,
4404 * other threads may have noticed network errors */
4405 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4406 drbd_conn_str(device->state.conn));
4411 drbd_bm_unlock(device);
4412 if (!err && device->state.conn == C_WF_BITMAP_S)
4413 drbd_start_resync(device, C_SYNC_SOURCE);
4417 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4419 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4422 return ignore_remaining_packet(connection, pi);
4425 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4427 /* Make sure we've acked all the TCP data associated
4428 * with the data requests being unplugged */
4429 drbd_tcp_quickack(connection->data.socket);
4434 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4436 struct drbd_peer_device *peer_device;
4437 struct drbd_device *device;
4438 struct p_block_desc *p = pi->data;
4440 peer_device = conn_peer_device(connection, pi->vnr);
4443 device = peer_device->device;
4445 switch (device->state.conn) {
4446 case C_WF_SYNC_UUID:
4451 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4452 drbd_conn_str(device->state.conn));
4455 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4463 int (*fn)(struct drbd_connection *, struct packet_info *);
4466 static struct data_cmd drbd_cmd_handler[] = {
4467 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4468 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4469 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4470 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4471 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4472 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4473 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4474 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4475 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4476 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4477 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4478 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4479 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4480 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4481 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4482 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4483 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4484 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4485 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4486 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4487 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4488 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4489 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4490 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4491 [P_TRIM] = { 0, sizeof(struct p_trim), receive_Data },
4494 static void drbdd(struct drbd_connection *connection)
4496 struct packet_info pi;
4497 size_t shs; /* sub header size */
4500 while (get_t_state(&connection->receiver) == RUNNING) {
4501 struct data_cmd *cmd;
4503 drbd_thread_current_set_cpu(&connection->receiver);
4504 if (drbd_recv_header(connection, &pi))
4507 cmd = &drbd_cmd_handler[pi.cmd];
4508 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4509 drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4510 cmdname(pi.cmd), pi.cmd);
4514 shs = cmd->pkt_size;
4515 if (pi.size > shs && !cmd->expect_payload) {
4516 drbd_err(connection, "No payload expected %s l:%d\n",
4517 cmdname(pi.cmd), pi.size);
4522 err = drbd_recv_all_warn(connection, pi.data, shs);
4528 err = cmd->fn(connection, &pi);
4530 drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4531 cmdname(pi.cmd), err, pi.size);
4538 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4541 static void conn_disconnect(struct drbd_connection *connection)
4543 struct drbd_peer_device *peer_device;
4547 if (connection->cstate == C_STANDALONE)
4550 /* We are about to start the cleanup after connection loss.
4551 * Make sure drbd_make_request knows about that.
4552 * Usually we should be in some network failure state already,
4553 * but just in case we are not, we fix it up here.
4555 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4557 /* asender does not clean up anything. it must not interfere, either */
4558 drbd_thread_stop(&connection->asender);
4559 drbd_free_sock(connection);
4562 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4563 struct drbd_device *device = peer_device->device;
4564 kref_get(&device->kref);
4566 drbd_disconnected(peer_device);
4567 kref_put(&device->kref, drbd_destroy_device);
4572 if (!list_empty(&connection->current_epoch->list))
4573 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4574 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4575 atomic_set(&connection->current_epoch->epoch_size, 0);
4576 connection->send.seen_any_write_yet = false;
4578 drbd_info(connection, "Connection closed\n");
4580 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4581 conn_try_outdate_peer_async(connection);
4583 spin_lock_irq(&connection->resource->req_lock);
4584 oc = connection->cstate;
4585 if (oc >= C_UNCONNECTED)
4586 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4588 spin_unlock_irq(&connection->resource->req_lock);
4590 if (oc == C_DISCONNECTING)
4591 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4594 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4596 struct drbd_device *device = peer_device->device;
4599 /* wait for current activity to cease. */
4600 spin_lock_irq(&device->resource->req_lock);
4601 _drbd_wait_ee_list_empty(device, &device->active_ee);
4602 _drbd_wait_ee_list_empty(device, &device->sync_ee);
4603 _drbd_wait_ee_list_empty(device, &device->read_ee);
4604 spin_unlock_irq(&device->resource->req_lock);
4606 /* We do not have data structures that would allow us to
4607 * get the rs_pending_cnt down to 0 again.
4608 * * On C_SYNC_TARGET we do not have any data structures describing
4609 * the pending RSDataRequest's we have sent.
4610 * * On C_SYNC_SOURCE there is no data structure that tracks
4611 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4612 * And no, it is not the sum of the reference counts in the
4613 * resync_LRU. The resync_LRU tracks the whole operation including
4614 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4616 drbd_rs_cancel_all(device);
4617 device->rs_total = 0;
4618 device->rs_failed = 0;
4619 atomic_set(&device->rs_pending_cnt, 0);
4620 wake_up(&device->misc_wait);
4622 del_timer_sync(&device->resync_timer);
4623 resync_timer_fn((unsigned long)device);
4625 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4626 * w_make_resync_request etc. which may still be on the worker queue
4627 * to be "canceled" */
4628 drbd_flush_workqueue(&peer_device->connection->sender_work);
4630 drbd_finish_peer_reqs(device);
4632 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4633 might have issued a work again. The one before drbd_finish_peer_reqs() is
4634 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4635 drbd_flush_workqueue(&peer_device->connection->sender_work);
4637 /* need to do it again, drbd_finish_peer_reqs() may have populated it
4638 * again via drbd_try_clear_on_disk_bm(). */
4639 drbd_rs_cancel_all(device);
4641 kfree(device->p_uuid);
4642 device->p_uuid = NULL;
4644 if (!drbd_suspended(device))
4645 tl_clear(peer_device->connection);
4647 drbd_md_sync(device);
4649 /* serialize with bitmap writeout triggered by the state change,
4651 wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4653 /* tcp_close and release of sendpage pages can be deferred. I don't
4654 * want to use SO_LINGER, because apparently it can be deferred for
4655 * more than 20 seconds (longest time I checked).
4657 * Actually we don't care for exactly when the network stack does its
4658 * put_page(), but release our reference on these pages right here.
4660 i = drbd_free_peer_reqs(device, &device->net_ee);
4662 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4663 i = atomic_read(&device->pp_in_use_by_net);
4665 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4666 i = atomic_read(&device->pp_in_use);
4668 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4670 D_ASSERT(device, list_empty(&device->read_ee));
4671 D_ASSERT(device, list_empty(&device->active_ee));
4672 D_ASSERT(device, list_empty(&device->sync_ee));
4673 D_ASSERT(device, list_empty(&device->done_ee));
4679 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4680 * we can agree on is stored in agreed_pro_version.
4682 * feature flags and the reserved array should be enough room for future
4683 * enhancements of the handshake protocol, and possible plugins...
4685 * for now, they are expected to be zero, but ignored.
4687 static int drbd_send_features(struct drbd_connection *connection)
4689 struct drbd_socket *sock;
4690 struct p_connection_features *p;
4692 sock = &connection->data;
4693 p = conn_prepare_command(connection, sock);
4696 memset(p, 0, sizeof(*p));
4697 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4698 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4699 p->feature_flags = cpu_to_be32(PRO_FEATURES);
4700 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4705 * 1 yes, we have a valid connection
4706 * 0 oops, did not work out, please try again
4707 * -1 peer talks different language,
4708 * no point in trying again, please go standalone.
4710 static int drbd_do_features(struct drbd_connection *connection)
4712 /* ASSERT current == connection->receiver ... */
4713 struct p_connection_features *p;
4714 const int expect = sizeof(struct p_connection_features);
4715 struct packet_info pi;
4718 err = drbd_send_features(connection);
4722 err = drbd_recv_header(connection, &pi);
4726 if (pi.cmd != P_CONNECTION_FEATURES) {
4727 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4728 cmdname(pi.cmd), pi.cmd);
4732 if (pi.size != expect) {
4733 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4739 err = drbd_recv_all_warn(connection, p, expect);
4743 p->protocol_min = be32_to_cpu(p->protocol_min);
4744 p->protocol_max = be32_to_cpu(p->protocol_max);
4745 if (p->protocol_max == 0)
4746 p->protocol_max = p->protocol_min;
4748 if (PRO_VERSION_MAX < p->protocol_min ||
4749 PRO_VERSION_MIN > p->protocol_max)
4752 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4753 connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4755 drbd_info(connection, "Handshake successful: "
4756 "Agreed network protocol version %d\n", connection->agreed_pro_version);
4758 drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4759 connection->agreed_features & FF_TRIM ? " " : " not ");
4764 drbd_err(connection, "incompatible DRBD dialects: "
4765 "I support %d-%d, peer supports %d-%d\n",
4766 PRO_VERSION_MIN, PRO_VERSION_MAX,
4767 p->protocol_min, p->protocol_max);
4771 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4772 static int drbd_do_auth(struct drbd_connection *connection)
4774 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4775 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4779 #define CHALLENGE_LEN 64
4783 0 - failed, try again (network error),
4784 -1 - auth failed, don't try again.
4787 static int drbd_do_auth(struct drbd_connection *connection)
4789 struct drbd_socket *sock;
4790 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4791 struct scatterlist sg;
4792 char *response = NULL;
4793 char *right_response = NULL;
4794 char *peers_ch = NULL;
4795 unsigned int key_len;
4796 char secret[SHARED_SECRET_MAX]; /* 64 byte */
4797 unsigned int resp_size;
4798 struct hash_desc desc;
4799 struct packet_info pi;
4800 struct net_conf *nc;
4803 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4806 nc = rcu_dereference(connection->net_conf);
4807 key_len = strlen(nc->shared_secret);
4808 memcpy(secret, nc->shared_secret, key_len);
4811 desc.tfm = connection->cram_hmac_tfm;
4814 rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4816 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4821 get_random_bytes(my_challenge, CHALLENGE_LEN);
4823 sock = &connection->data;
4824 if (!conn_prepare_command(connection, sock)) {
4828 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4829 my_challenge, CHALLENGE_LEN);
4833 err = drbd_recv_header(connection, &pi);
4839 if (pi.cmd != P_AUTH_CHALLENGE) {
4840 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4841 cmdname(pi.cmd), pi.cmd);
4846 if (pi.size > CHALLENGE_LEN * 2) {
4847 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4852 if (pi.size < CHALLENGE_LEN) {
4853 drbd_err(connection, "AuthChallenge payload too small.\n");
4858 peers_ch = kmalloc(pi.size, GFP_NOIO);
4859 if (peers_ch == NULL) {
4860 drbd_err(connection, "kmalloc of peers_ch failed\n");
4865 err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4871 if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
4872 drbd_err(connection, "Peer presented the same challenge!\n");
4877 resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4878 response = kmalloc(resp_size, GFP_NOIO);
4879 if (response == NULL) {
4880 drbd_err(connection, "kmalloc of response failed\n");
4885 sg_init_table(&sg, 1);
4886 sg_set_buf(&sg, peers_ch, pi.size);
4888 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4890 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4895 if (!conn_prepare_command(connection, sock)) {
4899 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
4900 response, resp_size);
4904 err = drbd_recv_header(connection, &pi);
4910 if (pi.cmd != P_AUTH_RESPONSE) {
4911 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
4912 cmdname(pi.cmd), pi.cmd);
4917 if (pi.size != resp_size) {
4918 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
4923 err = drbd_recv_all_warn(connection, response , resp_size);
4929 right_response = kmalloc(resp_size, GFP_NOIO);
4930 if (right_response == NULL) {
4931 drbd_err(connection, "kmalloc of right_response failed\n");
4936 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4938 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4940 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4945 rv = !memcmp(response, right_response, resp_size);
4948 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
4956 kfree(right_response);
4962 int drbd_receiver(struct drbd_thread *thi)
4964 struct drbd_connection *connection = thi->connection;
4967 drbd_info(connection, "receiver (re)started\n");
4970 h = conn_connect(connection);
4972 conn_disconnect(connection);
4973 schedule_timeout_interruptible(HZ);
4976 drbd_warn(connection, "Discarding network configuration.\n");
4977 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
4984 conn_disconnect(connection);
4986 drbd_info(connection, "receiver terminated\n");
4990 /* ********* acknowledge sender ******** */
4992 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4994 struct p_req_state_reply *p = pi->data;
4995 int retcode = be32_to_cpu(p->retcode);
4997 if (retcode >= SS_SUCCESS) {
4998 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5000 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5001 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5002 drbd_set_st_err_str(retcode), retcode);
5004 wake_up(&connection->ping_wait);
5009 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5011 struct drbd_peer_device *peer_device;
5012 struct drbd_device *device;
5013 struct p_req_state_reply *p = pi->data;
5014 int retcode = be32_to_cpu(p->retcode);
5016 peer_device = conn_peer_device(connection, pi->vnr);
5019 device = peer_device->device;
5021 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5022 D_ASSERT(device, connection->agreed_pro_version < 100);
5023 return got_conn_RqSReply(connection, pi);
5026 if (retcode >= SS_SUCCESS) {
5027 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5029 set_bit(CL_ST_CHG_FAIL, &device->flags);
5030 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5031 drbd_set_st_err_str(retcode), retcode);
5033 wake_up(&device->state_wait);
5038 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5040 return drbd_send_ping_ack(connection);
5044 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5046 /* restore idle timeout */
5047 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5048 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5049 wake_up(&connection->ping_wait);
5054 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5056 struct drbd_peer_device *peer_device;
5057 struct drbd_device *device;
5058 struct p_block_ack *p = pi->data;
5059 sector_t sector = be64_to_cpu(p->sector);
5060 int blksize = be32_to_cpu(p->blksize);
5062 peer_device = conn_peer_device(connection, pi->vnr);
5065 device = peer_device->device;
5067 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5069 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5071 if (get_ldev(device)) {
5072 drbd_rs_complete_io(device, sector);
5073 drbd_set_in_sync(device, sector, blksize);
5074 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5075 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5078 dec_rs_pending(device);
5079 atomic_add(blksize >> 9, &device->rs_sect_in);
5085 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5086 struct rb_root *root, const char *func,
5087 enum drbd_req_event what, bool missing_ok)
5089 struct drbd_request *req;
5090 struct bio_and_error m;
5092 spin_lock_irq(&device->resource->req_lock);
5093 req = find_request(device, root, id, sector, missing_ok, func);
5094 if (unlikely(!req)) {
5095 spin_unlock_irq(&device->resource->req_lock);
5098 __req_mod(req, what, &m);
5099 spin_unlock_irq(&device->resource->req_lock);
5102 complete_master_bio(device, &m);
5106 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5108 struct drbd_peer_device *peer_device;
5109 struct drbd_device *device;
5110 struct p_block_ack *p = pi->data;
5111 sector_t sector = be64_to_cpu(p->sector);
5112 int blksize = be32_to_cpu(p->blksize);
5113 enum drbd_req_event what;
5115 peer_device = conn_peer_device(connection, pi->vnr);
5118 device = peer_device->device;
5120 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5122 if (p->block_id == ID_SYNCER) {
5123 drbd_set_in_sync(device, sector, blksize);
5124 dec_rs_pending(device);
5128 case P_RS_WRITE_ACK:
5129 what = WRITE_ACKED_BY_PEER_AND_SIS;
5132 what = WRITE_ACKED_BY_PEER;
5135 what = RECV_ACKED_BY_PEER;
5138 what = CONFLICT_RESOLVED;
5141 what = POSTPONE_WRITE;
5147 return validate_req_change_req_state(device, p->block_id, sector,
5148 &device->write_requests, __func__,
5152 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5154 struct drbd_peer_device *peer_device;
5155 struct drbd_device *device;
5156 struct p_block_ack *p = pi->data;
5157 sector_t sector = be64_to_cpu(p->sector);
5158 int size = be32_to_cpu(p->blksize);
5161 peer_device = conn_peer_device(connection, pi->vnr);
5164 device = peer_device->device;
5166 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5168 if (p->block_id == ID_SYNCER) {
5169 dec_rs_pending(device);
5170 drbd_rs_failed_io(device, sector, size);
5174 err = validate_req_change_req_state(device, p->block_id, sector,
5175 &device->write_requests, __func__,
5178 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5179 The master bio might already be completed, therefore the
5180 request is no longer in the collision hash. */
5181 /* In Protocol B we might already have got a P_RECV_ACK
5182 but then get a P_NEG_ACK afterwards. */
5183 drbd_set_out_of_sync(device, sector, size);
5188 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5190 struct drbd_peer_device *peer_device;
5191 struct drbd_device *device;
5192 struct p_block_ack *p = pi->data;
5193 sector_t sector = be64_to_cpu(p->sector);
5195 peer_device = conn_peer_device(connection, pi->vnr);
5198 device = peer_device->device;
5200 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5202 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5203 (unsigned long long)sector, be32_to_cpu(p->blksize));
5205 return validate_req_change_req_state(device, p->block_id, sector,
5206 &device->read_requests, __func__,
5210 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5212 struct drbd_peer_device *peer_device;
5213 struct drbd_device *device;
5216 struct p_block_ack *p = pi->data;
5218 peer_device = conn_peer_device(connection, pi->vnr);
5221 device = peer_device->device;
5223 sector = be64_to_cpu(p->sector);
5224 size = be32_to_cpu(p->blksize);
5226 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5228 dec_rs_pending(device);
5230 if (get_ldev_if_state(device, D_FAILED)) {
5231 drbd_rs_complete_io(device, sector);
5233 case P_NEG_RS_DREPLY:
5234 drbd_rs_failed_io(device, sector, size);
5246 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5248 struct p_barrier_ack *p = pi->data;
5249 struct drbd_peer_device *peer_device;
5252 tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5255 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5256 struct drbd_device *device = peer_device->device;
5258 if (device->state.conn == C_AHEAD &&
5259 atomic_read(&device->ap_in_flight) == 0 &&
5260 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5261 device->start_resync_timer.expires = jiffies + HZ;
5262 add_timer(&device->start_resync_timer);
5270 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5272 struct drbd_peer_device *peer_device;
5273 struct drbd_device *device;
5274 struct p_block_ack *p = pi->data;
5275 struct drbd_device_work *dw;
5279 peer_device = conn_peer_device(connection, pi->vnr);
5282 device = peer_device->device;
5284 sector = be64_to_cpu(p->sector);
5285 size = be32_to_cpu(p->blksize);
5287 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5289 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5290 drbd_ov_out_of_sync_found(device, sector, size);
5292 ov_out_of_sync_print(device);
5294 if (!get_ldev(device))
5297 drbd_rs_complete_io(device, sector);
5298 dec_rs_pending(device);
5302 /* let's advance progress step marks only for every other megabyte */
5303 if ((device->ov_left & 0x200) == 0x200)
5304 drbd_advance_rs_marks(device, device->ov_left);
5306 if (device->ov_left == 0) {
5307 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5309 dw->w.cb = w_ov_finished;
5310 dw->device = device;
5311 drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5313 drbd_err(device, "kmalloc(dw) failed.");
5314 ov_out_of_sync_print(device);
5315 drbd_resync_finished(device);
5322 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5327 static int connection_finish_peer_reqs(struct drbd_connection *connection)
5329 struct drbd_peer_device *peer_device;
5330 int vnr, not_empty = 0;
5333 clear_bit(SIGNAL_ASENDER, &connection->flags);
5334 flush_signals(current);
5337 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5338 struct drbd_device *device = peer_device->device;
5339 kref_get(&device->kref);
5341 if (drbd_finish_peer_reqs(device)) {
5342 kref_put(&device->kref, drbd_destroy_device);
5345 kref_put(&device->kref, drbd_destroy_device);
5348 set_bit(SIGNAL_ASENDER, &connection->flags);
5350 spin_lock_irq(&connection->resource->req_lock);
5351 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5352 struct drbd_device *device = peer_device->device;
5353 not_empty = !list_empty(&device->done_ee);
5357 spin_unlock_irq(&connection->resource->req_lock);
5359 } while (not_empty);
5364 struct asender_cmd {
5366 int (*fn)(struct drbd_connection *connection, struct packet_info *);
5369 static struct asender_cmd asender_tbl[] = {
5370 [P_PING] = { 0, got_Ping },
5371 [P_PING_ACK] = { 0, got_PingAck },
5372 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5373 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5374 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5375 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck },
5376 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5377 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
5378 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
5379 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5380 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5381 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5382 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
5383 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
5384 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5385 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5386 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5389 int drbd_asender(struct drbd_thread *thi)
5391 struct drbd_connection *connection = thi->connection;
5392 struct asender_cmd *cmd = NULL;
5393 struct packet_info pi;
5395 void *buf = connection->meta.rbuf;
5397 unsigned int header_size = drbd_header_size(connection);
5398 int expect = header_size;
5399 bool ping_timeout_active = false;
5400 struct net_conf *nc;
5401 int ping_timeo, tcp_cork, ping_int;
5402 struct sched_param param = { .sched_priority = 2 };
5404 rv = sched_setscheduler(current, SCHED_RR, ¶m);
5406 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5408 while (get_t_state(thi) == RUNNING) {
5409 drbd_thread_current_set_cpu(thi);
5412 nc = rcu_dereference(connection->net_conf);
5413 ping_timeo = nc->ping_timeo;
5414 tcp_cork = nc->tcp_cork;
5415 ping_int = nc->ping_int;
5418 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5419 if (drbd_send_ping(connection)) {
5420 drbd_err(connection, "drbd_send_ping has failed\n");
5423 connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5424 ping_timeout_active = true;
5427 /* TODO: conditionally cork; it may hurt latency if we cork without
5430 drbd_tcp_cork(connection->meta.socket);
5431 if (connection_finish_peer_reqs(connection)) {
5432 drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5435 /* but unconditionally uncork unless disabled */
5437 drbd_tcp_uncork(connection->meta.socket);
5439 /* short circuit, recv_msg would return EINTR anyways. */
5440 if (signal_pending(current))
5443 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5444 clear_bit(SIGNAL_ASENDER, &connection->flags);
5446 flush_signals(current);
5449 * -EINTR (on meta) we got a signal
5450 * -EAGAIN (on meta) rcvtimeo expired
5451 * -ECONNRESET other side closed the connection
5452 * -ERESTARTSYS (on data) we got a signal
5453 * rv < 0 other than above: unexpected error!
5454 * rv == expected: full header or command
5455 * rv < expected: "woken" by signal during receive
5456 * rv == 0 : "connection shut down by peer"
5458 if (likely(rv > 0)) {
5461 } else if (rv == 0) {
5462 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5465 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5468 t = wait_event_timeout(connection->ping_wait,
5469 connection->cstate < C_WF_REPORT_PARAMS,
5474 drbd_err(connection, "meta connection shut down by peer.\n");
5476 } else if (rv == -EAGAIN) {
5477 /* If the data socket received something meanwhile,
5478 * that is good enough: peer is still alive. */
5479 if (time_after(connection->last_received,
5480 jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5482 if (ping_timeout_active) {
5483 drbd_err(connection, "PingAck did not arrive in time.\n");
5486 set_bit(SEND_PING, &connection->flags);
5488 } else if (rv == -EINTR) {
5491 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5495 if (received == expect && cmd == NULL) {
5496 if (decode_header(connection, connection->meta.rbuf, &pi))
5498 cmd = &asender_tbl[pi.cmd];
5499 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5500 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5501 cmdname(pi.cmd), pi.cmd);
5504 expect = header_size + cmd->pkt_size;
5505 if (pi.size != expect - header_size) {
5506 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5511 if (received == expect) {
5514 err = cmd->fn(connection, &pi);
5516 drbd_err(connection, "%pf failed\n", cmd->fn);
5520 connection->last_received = jiffies;
5522 if (cmd == &asender_tbl[P_PING_ACK]) {
5523 /* restore idle timeout */
5524 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5525 ping_timeout_active = false;
5528 buf = connection->meta.rbuf;
5530 expect = header_size;
5537 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5538 conn_md_sync(connection);
5542 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5544 clear_bit(SIGNAL_ASENDER, &connection->flags);
5546 drbd_info(connection, "asender terminated\n");