4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
39 #include "drbd_protocol.h"
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
46 * drbd_md_endio (defined here)
47 * drbd_request_endio (defined here)
48 * drbd_peer_request_endio (defined here)
49 * drbd_bm_endio (defined in drbd_bitmap.c)
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
58 /* used for synchronous meta data and bitmap IO
59 * submitted by drbd_md_sync_page_io()
61 void drbd_md_endio(struct bio *bio)
63 struct drbd_device *device;
65 device = bio->bi_private;
66 device->md_io.error = bio->bi_error;
68 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
69 * to timeout on the lower level device, and eventually detach from it.
70 * If this io completion runs after that timeout expired, this
71 * drbd_md_put_buffer() may allow us to finally try and re-attach.
72 * During normal operation, this only puts that extra reference
74 * Make sure we first drop the reference, and only then signal
75 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
76 * next drbd_md_sync_page_io(), that we trigger the
77 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
79 drbd_md_put_buffer(device);
80 device->md_io.done = 1;
81 wake_up(&device->misc_wait);
83 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
87 /* reads on behalf of the partner,
88 * "submitted" by the receiver
90 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
92 unsigned long flags = 0;
93 struct drbd_peer_device *peer_device = peer_req->peer_device;
94 struct drbd_device *device = peer_device->device;
96 spin_lock_irqsave(&device->resource->req_lock, flags);
97 device->read_cnt += peer_req->i.size >> 9;
98 list_del(&peer_req->w.list);
99 if (list_empty(&device->read_ee))
100 wake_up(&device->ee_wait);
101 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
102 __drbd_chk_io_error(device, DRBD_READ_ERROR);
103 spin_unlock_irqrestore(&device->resource->req_lock, flags);
105 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
109 /* writes on behalf of the partner, or resync writes,
110 * "submitted" by the receiver, final stage. */
111 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
113 unsigned long flags = 0;
114 struct drbd_peer_device *peer_device = peer_req->peer_device;
115 struct drbd_device *device = peer_device->device;
116 struct drbd_connection *connection = peer_device->connection;
117 struct drbd_interval i;
120 int do_al_complete_io;
122 /* after we moved peer_req to done_ee,
123 * we may no longer access it,
124 * it may be freed/reused already!
125 * (as soon as we release the req_lock) */
127 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
128 block_id = peer_req->block_id;
129 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
131 spin_lock_irqsave(&device->resource->req_lock, flags);
132 device->writ_cnt += peer_req->i.size >> 9;
133 list_move_tail(&peer_req->w.list, &device->done_ee);
136 * Do not remove from the write_requests tree here: we did not send the
137 * Ack yet and did not wake possibly waiting conflicting requests.
138 * Removed from the tree from "drbd_process_done_ee" within the
139 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
140 * _drbd_clear_done_ee.
143 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
145 /* FIXME do we want to detach for failed REQ_DISCARD?
146 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
147 if (peer_req->flags & EE_WAS_ERROR)
148 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
150 if (connection->cstate >= C_WF_REPORT_PARAMS) {
151 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
152 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
153 kref_put(&device->kref, drbd_destroy_device);
155 spin_unlock_irqrestore(&device->resource->req_lock, flags);
157 if (block_id == ID_SYNCER)
158 drbd_rs_complete_io(device, i.sector);
161 wake_up(&device->ee_wait);
163 if (do_al_complete_io)
164 drbd_al_complete_io(device, &i);
169 /* writes on behalf of the partner, or resync writes,
170 * "submitted" by the receiver.
172 void drbd_peer_request_endio(struct bio *bio)
174 struct drbd_peer_request *peer_req = bio->bi_private;
175 struct drbd_device *device = peer_req->peer_device->device;
176 int is_write = bio_data_dir(bio) == WRITE;
177 int is_discard = !!(bio->bi_rw & REQ_DISCARD);
179 if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
180 drbd_warn(device, "%s: error=%d s=%llus\n",
181 is_write ? (is_discard ? "discard" : "write")
182 : "read", bio->bi_error,
183 (unsigned long long)peer_req->i.sector);
186 set_bit(__EE_WAS_ERROR, &peer_req->flags);
188 bio_put(bio); /* no need for the bio anymore */
189 if (atomic_dec_and_test(&peer_req->pending_bios)) {
191 drbd_endio_write_sec_final(peer_req);
193 drbd_endio_read_sec_final(peer_req);
197 void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199 panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
200 device->minor, device->resource->name, device->vnr);
203 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205 void drbd_request_endio(struct bio *bio)
208 struct drbd_request *req = bio->bi_private;
209 struct drbd_device *device = req->device;
210 struct bio_and_error m;
211 enum drbd_req_event what;
213 /* If this request was aborted locally before,
214 * but now was completed "successfully",
215 * chances are that this caused arbitrary data corruption.
217 * "aborting" requests, or force-detaching the disk, is intended for
218 * completely blocked/hung local backing devices which do no longer
219 * complete requests at all, not even do error completions. In this
220 * situation, usually a hard-reset and failover is the only way out.
222 * By "aborting", basically faking a local error-completion,
223 * we allow for a more graceful swichover by cleanly migrating services.
224 * Still the affected node has to be rebooted "soon".
226 * By completing these requests, we allow the upper layers to re-use
227 * the associated data pages.
229 * If later the local backing device "recovers", and now DMAs some data
230 * from disk into the original request pages, in the best case it will
231 * just put random data into unused pages; but typically it will corrupt
232 * meanwhile completely unrelated data, causing all sorts of damage.
234 * Which means delayed successful completion,
235 * especially for READ requests,
236 * is a reason to panic().
238 * We assume that a delayed *error* completion is OK,
239 * though we still will complain noisily about it.
241 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
242 if (__ratelimit(&drbd_ratelimit_state))
243 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
246 drbd_panic_after_delayed_completion_of_aborted_request(device);
249 /* to avoid recursion in __req_mod */
250 if (unlikely(bio->bi_error)) {
251 if (bio->bi_rw & REQ_DISCARD)
252 what = (bio->bi_error == -EOPNOTSUPP)
253 ? DISCARD_COMPLETED_NOTSUPP
254 : DISCARD_COMPLETED_WITH_ERROR;
256 what = (bio_data_dir(bio) == WRITE)
257 ? WRITE_COMPLETED_WITH_ERROR
258 : (bio_rw(bio) == READ)
259 ? READ_COMPLETED_WITH_ERROR
260 : READ_AHEAD_COMPLETED_WITH_ERROR;
264 bio_put(req->private_bio);
265 req->private_bio = ERR_PTR(bio->bi_error);
267 /* not req_mod(), we need irqsave here! */
268 spin_lock_irqsave(&device->resource->req_lock, flags);
269 __req_mod(req, what, &m);
270 spin_unlock_irqrestore(&device->resource->req_lock, flags);
274 complete_master_bio(device, &m);
277 void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
279 struct hash_desc desc;
280 struct scatterlist sg;
281 struct page *page = peer_req->pages;
288 sg_init_table(&sg, 1);
289 crypto_hash_init(&desc);
291 while ((tmp = page_chain_next(page))) {
292 /* all but the last page will be fully used */
293 sg_set_page(&sg, page, PAGE_SIZE, 0);
294 crypto_hash_update(&desc, &sg, sg.length);
297 /* and now the last, possibly only partially used page */
298 len = peer_req->i.size & (PAGE_SIZE - 1);
299 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
300 crypto_hash_update(&desc, &sg, sg.length);
301 crypto_hash_final(&desc, digest);
304 void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
306 struct hash_desc desc;
307 struct scatterlist sg;
309 struct bvec_iter iter;
314 sg_init_table(&sg, 1);
315 crypto_hash_init(&desc);
317 bio_for_each_segment(bvec, bio, iter) {
318 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
319 crypto_hash_update(&desc, &sg, sg.length);
321 crypto_hash_final(&desc, digest);
324 /* MAYBE merge common code with w_e_end_ov_req */
325 static int w_e_send_csum(struct drbd_work *w, int cancel)
327 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
328 struct drbd_peer_device *peer_device = peer_req->peer_device;
329 struct drbd_device *device = peer_device->device;
334 if (unlikely(cancel))
337 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
340 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
341 digest = kmalloc(digest_size, GFP_NOIO);
343 sector_t sector = peer_req->i.sector;
344 unsigned int size = peer_req->i.size;
345 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
346 /* Free peer_req and pages before send.
347 * In case we block on congestion, we could otherwise run into
348 * some distributed deadlock, if the other side blocks on
349 * congestion as well, because our receiver blocks in
350 * drbd_alloc_pages due to pp_in_use > max_buffers. */
351 drbd_free_peer_req(device, peer_req);
353 inc_rs_pending(device);
354 err = drbd_send_drequest_csum(peer_device, sector, size,
359 drbd_err(device, "kmalloc() of digest failed.\n");
365 drbd_free_peer_req(device, peer_req);
368 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
372 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
374 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
376 struct drbd_device *device = peer_device->device;
377 struct drbd_peer_request *peer_req;
379 if (!get_ldev(device))
382 /* GFP_TRY, because if there is no memory available right now, this may
383 * be rescheduled for later. It is "only" background resync, after all. */
384 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
385 size, true /* has real payload */, GFP_TRY);
389 peer_req->w.cb = w_e_send_csum;
390 spin_lock_irq(&device->resource->req_lock);
391 list_add_tail(&peer_req->w.list, &device->read_ee);
392 spin_unlock_irq(&device->resource->req_lock);
394 atomic_add(size >> 9, &device->rs_sect_ev);
395 if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
398 /* If it failed because of ENOMEM, retry should help. If it failed
399 * because bio_add_page failed (probably broken lower level driver),
400 * retry may or may not help.
401 * If it does not, you may need to force disconnect. */
402 spin_lock_irq(&device->resource->req_lock);
403 list_del(&peer_req->w.list);
404 spin_unlock_irq(&device->resource->req_lock);
406 drbd_free_peer_req(device, peer_req);
412 int w_resync_timer(struct drbd_work *w, int cancel)
414 struct drbd_device *device =
415 container_of(w, struct drbd_device, resync_work);
417 switch (device->state.conn) {
419 make_ov_request(device, cancel);
422 make_resync_request(device, cancel);
429 void resync_timer_fn(unsigned long data)
431 struct drbd_device *device = (struct drbd_device *) data;
433 drbd_queue_work_if_unqueued(
434 &first_peer_device(device)->connection->sender_work,
435 &device->resync_work);
438 static void fifo_set(struct fifo_buffer *fb, int value)
442 for (i = 0; i < fb->size; i++)
443 fb->values[i] = value;
446 static int fifo_push(struct fifo_buffer *fb, int value)
450 ov = fb->values[fb->head_index];
451 fb->values[fb->head_index++] = value;
453 if (fb->head_index >= fb->size)
459 static void fifo_add_val(struct fifo_buffer *fb, int value)
463 for (i = 0; i < fb->size; i++)
464 fb->values[i] += value;
467 struct fifo_buffer *fifo_alloc(int fifo_size)
469 struct fifo_buffer *fb;
471 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
476 fb->size = fifo_size;
482 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
484 struct disk_conf *dc;
485 unsigned int want; /* The number of sectors we want in-flight */
486 int req_sect; /* Number of sectors to request in this turn */
487 int correction; /* Number of sectors more we need in-flight */
488 int cps; /* correction per invocation of drbd_rs_controller() */
489 int steps; /* Number of time steps to plan ahead */
492 struct fifo_buffer *plan;
494 dc = rcu_dereference(device->ldev->disk_conf);
495 plan = rcu_dereference(device->rs_plan_s);
497 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
499 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
500 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
501 } else { /* normal path */
502 want = dc->c_fill_target ? dc->c_fill_target :
503 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
506 correction = want - device->rs_in_flight - plan->total;
509 cps = correction / steps;
510 fifo_add_val(plan, cps);
511 plan->total += cps * steps;
513 /* What we do in this step */
514 curr_corr = fifo_push(plan, 0);
515 plan->total -= curr_corr;
517 req_sect = sect_in + curr_corr;
521 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
522 if (req_sect > max_sect)
526 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
527 sect_in, device->rs_in_flight, want, correction,
528 steps, cps, device->rs_planed, curr_corr, req_sect);
534 static int drbd_rs_number_requests(struct drbd_device *device)
536 unsigned int sect_in; /* Number of sectors that came in since the last turn */
539 sect_in = atomic_xchg(&device->rs_sect_in, 0);
540 device->rs_in_flight -= sect_in;
543 mxb = drbd_get_max_buffers(device) / 2;
544 if (rcu_dereference(device->rs_plan_s)->size) {
545 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
546 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
548 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
549 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
553 /* Don't have more than "max-buffers"/2 in-flight.
554 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
555 * potentially causing a distributed deadlock on congestion during
556 * online-verify or (checksum-based) resync, if max-buffers,
557 * socket buffer sizes and resync rate settings are mis-configured. */
559 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
560 * mxb (as used here, and in drbd_alloc_pages on the peer) is
561 * "number of pages" (typically also 4k),
562 * but "rs_in_flight" is in "sectors" (512 Byte). */
563 if (mxb - device->rs_in_flight/8 < number)
564 number = mxb - device->rs_in_flight/8;
569 static int make_resync_request(struct drbd_device *const device, int cancel)
571 struct drbd_peer_device *const peer_device = first_peer_device(device);
572 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
575 const sector_t capacity = drbd_get_capacity(device->this_bdev);
577 int number, rollback_i, size;
578 int align, requeue = 0;
581 if (unlikely(cancel))
584 if (device->rs_total == 0) {
586 drbd_resync_finished(device);
590 if (!get_ldev(device)) {
591 /* Since we only need to access device->rsync a
592 get_ldev_if_state(device,D_FAILED) would be sufficient, but
593 to continue resync with a broken disk makes no sense at
595 drbd_err(device, "Disk broke down during resync!\n");
599 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
600 number = drbd_rs_number_requests(device);
604 for (i = 0; i < number; i++) {
605 /* Stop generating RS requests when half of the send buffer is filled,
606 * but notify TCP that we'd like to have more space. */
607 mutex_lock(&connection->data.mutex);
608 if (connection->data.socket) {
609 struct sock *sk = connection->data.socket->sk;
610 int queued = sk->sk_wmem_queued;
611 int sndbuf = sk->sk_sndbuf;
612 if (queued > sndbuf / 2) {
615 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
619 mutex_unlock(&connection->data.mutex);
624 size = BM_BLOCK_SIZE;
625 bit = drbd_bm_find_next(device, device->bm_resync_fo);
627 if (bit == DRBD_END_OF_BITMAP) {
628 device->bm_resync_fo = drbd_bm_bits(device);
633 sector = BM_BIT_TO_SECT(bit);
635 if (drbd_try_rs_begin_io(device, sector)) {
636 device->bm_resync_fo = bit;
639 device->bm_resync_fo = bit + 1;
641 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
642 drbd_rs_complete_io(device, sector);
646 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
647 /* try to find some adjacent bits.
648 * we stop if we have already the maximum req size.
650 * Additionally always align bigger requests, in order to
651 * be prepared for all stripe sizes of software RAIDs.
656 if (size + BM_BLOCK_SIZE > max_bio_size)
659 /* Be always aligned */
660 if (sector & ((1<<(align+3))-1))
663 /* do not cross extent boundaries */
664 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
666 /* now, is it actually dirty, after all?
667 * caution, drbd_bm_test_bit is tri-state for some
668 * obscure reason; ( b == 0 ) would get the out-of-band
669 * only accidentally right because of the "oddly sized"
670 * adjustment below */
671 if (drbd_bm_test_bit(device, bit+1) != 1)
674 size += BM_BLOCK_SIZE;
675 if ((BM_BLOCK_SIZE << align) <= size)
679 /* if we merged some,
680 * reset the offset to start the next drbd_bm_find_next from */
681 if (size > BM_BLOCK_SIZE)
682 device->bm_resync_fo = bit + 1;
685 /* adjust very last sectors, in case we are oddly sized */
686 if (sector + (size>>9) > capacity)
687 size = (capacity-sector)<<9;
689 if (device->use_csums) {
690 switch (read_for_csum(peer_device, sector, size)) {
691 case -EIO: /* Disk failure */
694 case -EAGAIN: /* allocation failed, or ldev busy */
695 drbd_rs_complete_io(device, sector);
696 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
708 inc_rs_pending(device);
709 err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
710 sector, size, ID_SYNCER);
712 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
713 dec_rs_pending(device);
720 if (device->bm_resync_fo >= drbd_bm_bits(device)) {
721 /* last syncer _request_ was sent,
722 * but the P_RS_DATA_REPLY not yet received. sync will end (and
723 * next sync group will resume), as soon as we receive the last
724 * resync data block, and the last bit is cleared.
725 * until then resync "work" is "inactive" ...
732 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
733 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
738 static int make_ov_request(struct drbd_device *device, int cancel)
742 const sector_t capacity = drbd_get_capacity(device->this_bdev);
743 bool stop_sector_reached = false;
745 if (unlikely(cancel))
748 number = drbd_rs_number_requests(device);
750 sector = device->ov_position;
751 for (i = 0; i < number; i++) {
752 if (sector >= capacity)
755 /* We check for "finished" only in the reply path:
756 * w_e_end_ov_reply().
757 * We need to send at least one request out. */
758 stop_sector_reached = i > 0
759 && verify_can_do_stop_sector(device)
760 && sector >= device->ov_stop_sector;
761 if (stop_sector_reached)
764 size = BM_BLOCK_SIZE;
766 if (drbd_try_rs_begin_io(device, sector)) {
767 device->ov_position = sector;
771 if (sector + (size>>9) > capacity)
772 size = (capacity-sector)<<9;
774 inc_rs_pending(device);
775 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
776 dec_rs_pending(device);
779 sector += BM_SECT_PER_BIT;
781 device->ov_position = sector;
784 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
785 if (i == 0 || !stop_sector_reached)
786 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
790 int w_ov_finished(struct drbd_work *w, int cancel)
792 struct drbd_device_work *dw =
793 container_of(w, struct drbd_device_work, w);
794 struct drbd_device *device = dw->device;
796 ov_out_of_sync_print(device);
797 drbd_resync_finished(device);
802 static int w_resync_finished(struct drbd_work *w, int cancel)
804 struct drbd_device_work *dw =
805 container_of(w, struct drbd_device_work, w);
806 struct drbd_device *device = dw->device;
809 drbd_resync_finished(device);
814 static void ping_peer(struct drbd_device *device)
816 struct drbd_connection *connection = first_peer_device(device)->connection;
818 clear_bit(GOT_PING_ACK, &connection->flags);
819 request_ping(connection);
820 wait_event(connection->ping_wait,
821 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
824 int drbd_resync_finished(struct drbd_device *device)
826 unsigned long db, dt, dbdt;
828 union drbd_state os, ns;
829 struct drbd_device_work *dw;
830 char *khelper_cmd = NULL;
833 /* Remove all elements from the resync LRU. Since future actions
834 * might set bits in the (main) bitmap, then the entries in the
835 * resync LRU would be wrong. */
836 if (drbd_rs_del_all(device)) {
837 /* In case this is not possible now, most probably because
838 * there are P_RS_DATA_REPLY Packets lingering on the worker's
839 * queue (or even the read operations for those packets
840 * is not finished by now). Retry in 100ms. */
842 schedule_timeout_interruptible(HZ / 10);
843 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
845 dw->w.cb = w_resync_finished;
847 drbd_queue_work(&first_peer_device(device)->connection->sender_work,
851 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
854 dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
858 db = device->rs_total;
859 /* adjust for verify start and stop sectors, respective reached position */
860 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
861 db -= device->ov_left;
863 dbdt = Bit2KB(db/dt);
864 device->rs_paused /= HZ;
866 if (!get_ldev(device))
871 spin_lock_irq(&device->resource->req_lock);
872 os = drbd_read_state(device);
874 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
876 /* This protects us against multiple calls (that can happen in the presence
877 of application IO), and against connectivity loss just before we arrive here. */
878 if (os.conn <= C_CONNECTED)
882 ns.conn = C_CONNECTED;
884 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
885 verify_done ? "Online verify" : "Resync",
886 dt + device->rs_paused, device->rs_paused, dbdt);
888 n_oos = drbd_bm_total_weight(device);
890 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
892 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
894 khelper_cmd = "out-of-sync";
897 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
899 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
900 khelper_cmd = "after-resync-target";
902 if (device->use_csums && device->rs_total) {
903 const unsigned long s = device->rs_same_csum;
904 const unsigned long t = device->rs_total;
907 (t < 100000) ? ((s*100)/t) : (s/(t/100));
908 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
909 "transferred %luK total %luK\n",
911 Bit2KB(device->rs_same_csum),
912 Bit2KB(device->rs_total - device->rs_same_csum),
913 Bit2KB(device->rs_total));
917 if (device->rs_failed) {
918 drbd_info(device, " %lu failed blocks\n", device->rs_failed);
920 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
921 ns.disk = D_INCONSISTENT;
922 ns.pdsk = D_UP_TO_DATE;
924 ns.disk = D_UP_TO_DATE;
925 ns.pdsk = D_INCONSISTENT;
928 ns.disk = D_UP_TO_DATE;
929 ns.pdsk = D_UP_TO_DATE;
931 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
932 if (device->p_uuid) {
934 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
935 _drbd_uuid_set(device, i, device->p_uuid[i]);
936 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
937 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
939 drbd_err(device, "device->p_uuid is NULL! BUG\n");
943 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
944 /* for verify runs, we don't update uuids here,
945 * so there would be nothing to report. */
946 drbd_uuid_set_bm(device, 0UL);
947 drbd_print_uuids(device, "updated UUIDs");
948 if (device->p_uuid) {
949 /* Now the two UUID sets are equal, update what we
950 * know of the peer. */
952 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
953 device->p_uuid[i] = device->ldev->md.uuid[i];
958 _drbd_set_state(device, ns, CS_VERBOSE, NULL);
960 spin_unlock_irq(&device->resource->req_lock);
963 device->rs_total = 0;
964 device->rs_failed = 0;
965 device->rs_paused = 0;
967 /* reset start sector, if we reached end of device */
968 if (verify_done && device->ov_left == 0)
969 device->ov_start_sector = 0;
971 drbd_md_sync(device);
974 drbd_khelper(device, khelper_cmd);
980 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
982 if (drbd_peer_req_has_active_page(peer_req)) {
983 /* This might happen if sendpage() has not finished */
984 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
985 atomic_add(i, &device->pp_in_use_by_net);
986 atomic_sub(i, &device->pp_in_use);
987 spin_lock_irq(&device->resource->req_lock);
988 list_add_tail(&peer_req->w.list, &device->net_ee);
989 spin_unlock_irq(&device->resource->req_lock);
990 wake_up(&drbd_pp_wait);
992 drbd_free_peer_req(device, peer_req);
996 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
997 * @device: DRBD device.
999 * @cancel: The connection will be closed anyways
1001 int w_e_end_data_req(struct drbd_work *w, int cancel)
1003 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1004 struct drbd_peer_device *peer_device = peer_req->peer_device;
1005 struct drbd_device *device = peer_device->device;
1008 if (unlikely(cancel)) {
1009 drbd_free_peer_req(device, peer_req);
1010 dec_unacked(device);
1014 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1015 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1017 if (__ratelimit(&drbd_ratelimit_state))
1018 drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1019 (unsigned long long)peer_req->i.sector);
1021 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1024 dec_unacked(device);
1026 move_to_net_ee_or_free(device, peer_req);
1029 drbd_err(device, "drbd_send_block() failed\n");
1034 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1036 * @cancel: The connection will be closed anyways
1038 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1040 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1041 struct drbd_peer_device *peer_device = peer_req->peer_device;
1042 struct drbd_device *device = peer_device->device;
1045 if (unlikely(cancel)) {
1046 drbd_free_peer_req(device, peer_req);
1047 dec_unacked(device);
1051 if (get_ldev_if_state(device, D_FAILED)) {
1052 drbd_rs_complete_io(device, peer_req->i.sector);
1056 if (device->state.conn == C_AHEAD) {
1057 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1058 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1059 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1060 inc_rs_pending(device);
1061 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1063 if (__ratelimit(&drbd_ratelimit_state))
1064 drbd_err(device, "Not sending RSDataReply, "
1065 "partner DISKLESS!\n");
1069 if (__ratelimit(&drbd_ratelimit_state))
1070 drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1071 (unsigned long long)peer_req->i.sector);
1073 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1075 /* update resync data with failure */
1076 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1079 dec_unacked(device);
1081 move_to_net_ee_or_free(device, peer_req);
1084 drbd_err(device, "drbd_send_block() failed\n");
1088 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1090 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1091 struct drbd_peer_device *peer_device = peer_req->peer_device;
1092 struct drbd_device *device = peer_device->device;
1093 struct digest_info *di;
1095 void *digest = NULL;
1098 if (unlikely(cancel)) {
1099 drbd_free_peer_req(device, peer_req);
1100 dec_unacked(device);
1104 if (get_ldev(device)) {
1105 drbd_rs_complete_io(device, peer_req->i.sector);
1109 di = peer_req->digest;
1111 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1112 /* quick hack to try to avoid a race against reconfiguration.
1113 * a real fix would be much more involved,
1114 * introducing more locking mechanisms */
1115 if (peer_device->connection->csums_tfm) {
1116 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1117 D_ASSERT(device, digest_size == di->digest_size);
1118 digest = kmalloc(digest_size, GFP_NOIO);
1121 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1122 eq = !memcmp(digest, di->digest, digest_size);
1127 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1128 /* rs_same_csums unit is BM_BLOCK_SIZE */
1129 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1130 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1132 inc_rs_pending(device);
1133 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1134 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1136 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1139 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1140 if (__ratelimit(&drbd_ratelimit_state))
1141 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1144 dec_unacked(device);
1145 move_to_net_ee_or_free(device, peer_req);
1148 drbd_err(device, "drbd_send_block/ack() failed\n");
1152 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1154 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1155 struct drbd_peer_device *peer_device = peer_req->peer_device;
1156 struct drbd_device *device = peer_device->device;
1157 sector_t sector = peer_req->i.sector;
1158 unsigned int size = peer_req->i.size;
1163 if (unlikely(cancel))
1166 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1167 digest = kmalloc(digest_size, GFP_NOIO);
1169 err = 1; /* terminate the connection in case the allocation failed */
1173 if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1174 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1176 memset(digest, 0, digest_size);
1178 /* Free e and pages before send.
1179 * In case we block on congestion, we could otherwise run into
1180 * some distributed deadlock, if the other side blocks on
1181 * congestion as well, because our receiver blocks in
1182 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1183 drbd_free_peer_req(device, peer_req);
1185 inc_rs_pending(device);
1186 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1188 dec_rs_pending(device);
1193 drbd_free_peer_req(device, peer_req);
1194 dec_unacked(device);
1198 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1200 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1201 device->ov_last_oos_size += size>>9;
1203 device->ov_last_oos_start = sector;
1204 device->ov_last_oos_size = size>>9;
1206 drbd_set_out_of_sync(device, sector, size);
1209 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1211 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1212 struct drbd_peer_device *peer_device = peer_req->peer_device;
1213 struct drbd_device *device = peer_device->device;
1214 struct digest_info *di;
1216 sector_t sector = peer_req->i.sector;
1217 unsigned int size = peer_req->i.size;
1220 bool stop_sector_reached = false;
1222 if (unlikely(cancel)) {
1223 drbd_free_peer_req(device, peer_req);
1224 dec_unacked(device);
1228 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1229 * the resync lru has been cleaned up already */
1230 if (get_ldev(device)) {
1231 drbd_rs_complete_io(device, peer_req->i.sector);
1235 di = peer_req->digest;
1237 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1238 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1239 digest = kmalloc(digest_size, GFP_NOIO);
1241 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1243 D_ASSERT(device, digest_size == di->digest_size);
1244 eq = !memcmp(digest, di->digest, digest_size);
1249 /* Free peer_req and pages before send.
1250 * In case we block on congestion, we could otherwise run into
1251 * some distributed deadlock, if the other side blocks on
1252 * congestion as well, because our receiver blocks in
1253 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1254 drbd_free_peer_req(device, peer_req);
1256 drbd_ov_out_of_sync_found(device, sector, size);
1258 ov_out_of_sync_print(device);
1260 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1261 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1263 dec_unacked(device);
1267 /* let's advance progress step marks only for every other megabyte */
1268 if ((device->ov_left & 0x200) == 0x200)
1269 drbd_advance_rs_marks(device, device->ov_left);
1271 stop_sector_reached = verify_can_do_stop_sector(device) &&
1272 (sector + (size>>9)) >= device->ov_stop_sector;
1274 if (device->ov_left == 0 || stop_sector_reached) {
1275 ov_out_of_sync_print(device);
1276 drbd_resync_finished(device);
1283 * We need to track the number of pending barrier acks,
1284 * and to be able to wait for them.
1285 * See also comment in drbd_adm_attach before drbd_suspend_io.
1287 static int drbd_send_barrier(struct drbd_connection *connection)
1289 struct p_barrier *p;
1290 struct drbd_socket *sock;
1292 sock = &connection->data;
1293 p = conn_prepare_command(connection, sock);
1296 p->barrier = connection->send.current_epoch_nr;
1298 connection->send.current_epoch_writes = 0;
1299 connection->send.last_sent_barrier_jif = jiffies;
1301 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1304 int w_send_write_hint(struct drbd_work *w, int cancel)
1306 struct drbd_device *device =
1307 container_of(w, struct drbd_device, unplug_work);
1308 struct drbd_socket *sock;
1312 sock = &first_peer_device(device)->connection->data;
1313 if (!drbd_prepare_command(first_peer_device(device), sock))
1315 return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1318 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1320 if (!connection->send.seen_any_write_yet) {
1321 connection->send.seen_any_write_yet = true;
1322 connection->send.current_epoch_nr = epoch;
1323 connection->send.current_epoch_writes = 0;
1324 connection->send.last_sent_barrier_jif = jiffies;
1328 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1330 /* re-init if first write on this connection */
1331 if (!connection->send.seen_any_write_yet)
1333 if (connection->send.current_epoch_nr != epoch) {
1334 if (connection->send.current_epoch_writes)
1335 drbd_send_barrier(connection);
1336 connection->send.current_epoch_nr = epoch;
1340 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1342 struct drbd_request *req = container_of(w, struct drbd_request, w);
1343 struct drbd_device *device = req->device;
1344 struct drbd_peer_device *const peer_device = first_peer_device(device);
1345 struct drbd_connection *const connection = peer_device->connection;
1348 if (unlikely(cancel)) {
1349 req_mod(req, SEND_CANCELED);
1352 req->pre_send_jif = jiffies;
1354 /* this time, no connection->send.current_epoch_writes++;
1355 * If it was sent, it was the closing barrier for the last
1356 * replicated epoch, before we went into AHEAD mode.
1357 * No more barriers will be sent, until we leave AHEAD mode again. */
1358 maybe_send_barrier(connection, req->epoch);
1360 err = drbd_send_out_of_sync(peer_device, req);
1361 req_mod(req, OOS_HANDED_TO_NETWORK);
1367 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1369 * @cancel: The connection will be closed anyways
1371 int w_send_dblock(struct drbd_work *w, int cancel)
1373 struct drbd_request *req = container_of(w, struct drbd_request, w);
1374 struct drbd_device *device = req->device;
1375 struct drbd_peer_device *const peer_device = first_peer_device(device);
1376 struct drbd_connection *connection = peer_device->connection;
1379 if (unlikely(cancel)) {
1380 req_mod(req, SEND_CANCELED);
1383 req->pre_send_jif = jiffies;
1385 re_init_if_first_write(connection, req->epoch);
1386 maybe_send_barrier(connection, req->epoch);
1387 connection->send.current_epoch_writes++;
1389 err = drbd_send_dblock(peer_device, req);
1390 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1396 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1398 * @cancel: The connection will be closed anyways
1400 int w_send_read_req(struct drbd_work *w, int cancel)
1402 struct drbd_request *req = container_of(w, struct drbd_request, w);
1403 struct drbd_device *device = req->device;
1404 struct drbd_peer_device *const peer_device = first_peer_device(device);
1405 struct drbd_connection *connection = peer_device->connection;
1408 if (unlikely(cancel)) {
1409 req_mod(req, SEND_CANCELED);
1412 req->pre_send_jif = jiffies;
1414 /* Even read requests may close a write epoch,
1415 * if there was any yet. */
1416 maybe_send_barrier(connection, req->epoch);
1418 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1419 (unsigned long)req);
1421 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1426 int w_restart_disk_io(struct drbd_work *w, int cancel)
1428 struct drbd_request *req = container_of(w, struct drbd_request, w);
1429 struct drbd_device *device = req->device;
1431 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1432 drbd_al_begin_io(device, &req->i);
1434 drbd_req_make_private_bio(req, req->master_bio);
1435 req->private_bio->bi_bdev = device->ldev->backing_bdev;
1436 generic_make_request(req->private_bio);
1441 static int _drbd_may_sync_now(struct drbd_device *device)
1443 struct drbd_device *odev = device;
1447 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1450 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1452 if (resync_after == -1)
1454 odev = minor_to_device(resync_after);
1457 if ((odev->state.conn >= C_SYNC_SOURCE &&
1458 odev->state.conn <= C_PAUSED_SYNC_T) ||
1459 odev->state.aftr_isp || odev->state.peer_isp ||
1460 odev->state.user_isp)
1466 * drbd_pause_after() - Pause resync on all devices that may not resync now
1467 * @device: DRBD device.
1469 * Called from process context only (admin command and after_state_ch).
1471 static bool drbd_pause_after(struct drbd_device *device)
1473 bool changed = false;
1474 struct drbd_device *odev;
1478 idr_for_each_entry(&drbd_devices, odev, i) {
1479 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1481 if (!_drbd_may_sync_now(odev) &&
1482 _drbd_set_state(_NS(odev, aftr_isp, 1),
1483 CS_HARD, NULL) != SS_NOTHING_TO_DO)
1492 * drbd_resume_next() - Resume resync on all devices that may resync now
1493 * @device: DRBD device.
1495 * Called from process context only (admin command and worker).
1497 static bool drbd_resume_next(struct drbd_device *device)
1499 bool changed = false;
1500 struct drbd_device *odev;
1504 idr_for_each_entry(&drbd_devices, odev, i) {
1505 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1507 if (odev->state.aftr_isp) {
1508 if (_drbd_may_sync_now(odev) &&
1509 _drbd_set_state(_NS(odev, aftr_isp, 0),
1510 CS_HARD, NULL) != SS_NOTHING_TO_DO)
1518 void resume_next_sg(struct drbd_device *device)
1520 lock_all_resources();
1521 drbd_resume_next(device);
1522 unlock_all_resources();
1525 void suspend_other_sg(struct drbd_device *device)
1527 lock_all_resources();
1528 drbd_pause_after(device);
1529 unlock_all_resources();
1532 /* caller must lock_all_resources() */
1533 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1535 struct drbd_device *odev;
1540 if (o_minor < -1 || o_minor > MINORMASK)
1541 return ERR_RESYNC_AFTER;
1543 /* check for loops */
1544 odev = minor_to_device(o_minor);
1547 return ERR_RESYNC_AFTER_CYCLE;
1549 /* You are free to depend on diskless, non-existing,
1550 * or not yet/no longer existing minors.
1551 * We only reject dependency loops.
1552 * We cannot follow the dependency chain beyond a detached or
1555 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1559 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1561 /* dependency chain ends here, no cycles. */
1562 if (resync_after == -1)
1565 /* follow the dependency chain */
1566 odev = minor_to_device(resync_after);
1570 /* caller must lock_all_resources() */
1571 void drbd_resync_after_changed(struct drbd_device *device)
1576 changed = drbd_pause_after(device);
1577 changed |= drbd_resume_next(device);
1581 void drbd_rs_controller_reset(struct drbd_device *device)
1583 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1584 struct fifo_buffer *plan;
1586 atomic_set(&device->rs_sect_in, 0);
1587 atomic_set(&device->rs_sect_ev, 0);
1588 device->rs_in_flight = 0;
1589 device->rs_last_events =
1590 (int)part_stat_read(&disk->part0, sectors[0]) +
1591 (int)part_stat_read(&disk->part0, sectors[1]);
1593 /* Updating the RCU protected object in place is necessary since
1594 this function gets called from atomic context.
1595 It is valid since all other updates also lead to an completely
1598 plan = rcu_dereference(device->rs_plan_s);
1604 void start_resync_timer_fn(unsigned long data)
1606 struct drbd_device *device = (struct drbd_device *) data;
1607 drbd_device_post_work(device, RS_START);
1610 static void do_start_resync(struct drbd_device *device)
1612 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1613 drbd_warn(device, "postponing start_resync ...\n");
1614 device->start_resync_timer.expires = jiffies + HZ/10;
1615 add_timer(&device->start_resync_timer);
1619 drbd_start_resync(device, C_SYNC_SOURCE);
1620 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1623 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1625 bool csums_after_crash_only;
1627 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1629 return connection->agreed_pro_version >= 89 && /* supported? */
1630 connection->csums_tfm && /* configured? */
1631 (csums_after_crash_only == 0 /* use for each resync? */
1632 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1636 * drbd_start_resync() - Start the resync process
1637 * @device: DRBD device.
1638 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1640 * This function might bring you directly into one of the
1641 * C_PAUSED_SYNC_* states.
1643 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1645 struct drbd_peer_device *peer_device = first_peer_device(device);
1646 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1647 union drbd_state ns;
1650 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1651 drbd_err(device, "Resync already running!\n");
1655 if (!test_bit(B_RS_H_DONE, &device->flags)) {
1656 if (side == C_SYNC_TARGET) {
1657 /* Since application IO was locked out during C_WF_BITMAP_T and
1658 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1659 we check that we might make the data inconsistent. */
1660 r = drbd_khelper(device, "before-resync-target");
1661 r = (r >> 8) & 0xff;
1663 drbd_info(device, "before-resync-target handler returned %d, "
1664 "dropping connection.\n", r);
1665 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1668 } else /* C_SYNC_SOURCE */ {
1669 r = drbd_khelper(device, "before-resync-source");
1670 r = (r >> 8) & 0xff;
1673 drbd_info(device, "before-resync-source handler returned %d, "
1674 "ignoring. Old userland tools?", r);
1676 drbd_info(device, "before-resync-source handler returned %d, "
1677 "dropping connection.\n", r);
1678 conn_request_state(connection,
1679 NS(conn, C_DISCONNECTING), CS_HARD);
1686 if (current == connection->worker.task) {
1687 /* The worker should not sleep waiting for state_mutex,
1688 that can take long */
1689 if (!mutex_trylock(device->state_mutex)) {
1690 set_bit(B_RS_H_DONE, &device->flags);
1691 device->start_resync_timer.expires = jiffies + HZ/5;
1692 add_timer(&device->start_resync_timer);
1696 mutex_lock(device->state_mutex);
1699 lock_all_resources();
1700 clear_bit(B_RS_H_DONE, &device->flags);
1701 /* Did some connection breakage or IO error race with us? */
1702 if (device->state.conn < C_CONNECTED
1703 || !get_ldev_if_state(device, D_NEGOTIATING)) {
1704 unlock_all_resources();
1708 ns = drbd_read_state(device);
1710 ns.aftr_isp = !_drbd_may_sync_now(device);
1714 if (side == C_SYNC_TARGET)
1715 ns.disk = D_INCONSISTENT;
1716 else /* side == C_SYNC_SOURCE */
1717 ns.pdsk = D_INCONSISTENT;
1719 r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1720 ns = drbd_read_state(device);
1722 if (ns.conn < C_CONNECTED)
1723 r = SS_UNKNOWN_ERROR;
1725 if (r == SS_SUCCESS) {
1726 unsigned long tw = drbd_bm_total_weight(device);
1727 unsigned long now = jiffies;
1730 device->rs_failed = 0;
1731 device->rs_paused = 0;
1732 device->rs_same_csum = 0;
1733 device->rs_last_sect_ev = 0;
1734 device->rs_total = tw;
1735 device->rs_start = now;
1736 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1737 device->rs_mark_left[i] = tw;
1738 device->rs_mark_time[i] = now;
1740 drbd_pause_after(device);
1741 /* Forget potentially stale cached per resync extent bit-counts.
1742 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1743 * disabled, and know the disk state is ok. */
1744 spin_lock(&device->al_lock);
1745 lc_reset(device->resync);
1746 device->resync_locked = 0;
1747 device->resync_wenr = LC_FREE;
1748 spin_unlock(&device->al_lock);
1750 unlock_all_resources();
1752 if (r == SS_SUCCESS) {
1753 wake_up(&device->al_wait); /* for lc_reset() above */
1754 /* reset rs_last_bcast when a resync or verify is started,
1755 * to deal with potential jiffies wrap. */
1756 device->rs_last_bcast = jiffies - HZ;
1758 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1759 drbd_conn_str(ns.conn),
1760 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1761 (unsigned long) device->rs_total);
1762 if (side == C_SYNC_TARGET) {
1763 device->bm_resync_fo = 0;
1764 device->use_csums = use_checksum_based_resync(connection, device);
1766 device->use_csums = 0;
1769 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1770 * with w_send_oos, or the sync target will get confused as to
1771 * how much bits to resync. We cannot do that always, because for an
1772 * empty resync and protocol < 95, we need to do it here, as we call
1773 * drbd_resync_finished from here in that case.
1774 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1775 * and from after_state_ch otherwise. */
1776 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1777 drbd_gen_and_send_sync_uuid(peer_device);
1779 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1780 /* This still has a race (about when exactly the peers
1781 * detect connection loss) that can lead to a full sync
1782 * on next handshake. In 8.3.9 we fixed this with explicit
1783 * resync-finished notifications, but the fix
1784 * introduces a protocol change. Sleeping for some
1785 * time longer than the ping interval + timeout on the
1786 * SyncSource, to give the SyncTarget the chance to
1787 * detect connection loss, then waiting for a ping
1788 * response (implicit in drbd_resync_finished) reduces
1789 * the race considerably, but does not solve it. */
1790 if (side == C_SYNC_SOURCE) {
1791 struct net_conf *nc;
1795 nc = rcu_dereference(connection->net_conf);
1796 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1798 schedule_timeout_interruptible(timeo);
1800 drbd_resync_finished(device);
1803 drbd_rs_controller_reset(device);
1804 /* ns.conn may already be != device->state.conn,
1805 * we may have been paused in between, or become paused until
1806 * the timer triggers.
1807 * No matter, that is handled in resync_timer_fn() */
1808 if (ns.conn == C_SYNC_TARGET)
1809 mod_timer(&device->resync_timer, jiffies);
1811 drbd_md_sync(device);
1815 mutex_unlock(device->state_mutex);
1818 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1820 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1821 device->rs_last_bcast = jiffies;
1823 if (!get_ldev(device))
1826 drbd_bm_write_lazy(device, 0);
1827 if (resync_done && is_sync_state(device->state.conn))
1828 drbd_resync_finished(device);
1830 drbd_bcast_event(device, &sib);
1831 /* update timestamp, in case it took a while to write out stuff */
1832 device->rs_last_bcast = jiffies;
1836 static void drbd_ldev_destroy(struct drbd_device *device)
1838 lc_destroy(device->resync);
1839 device->resync = NULL;
1840 lc_destroy(device->act_log);
1841 device->act_log = NULL;
1844 drbd_backing_dev_free(device, device->ldev);
1845 device->ldev = NULL;
1848 clear_bit(GOING_DISKLESS, &device->flags);
1849 wake_up(&device->misc_wait);
1852 static void go_diskless(struct drbd_device *device)
1854 D_ASSERT(device, device->state.disk == D_FAILED);
1855 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1856 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1857 * the protected members anymore, though, so once put_ldev reaches zero
1858 * again, it will be safe to free them. */
1860 /* Try to write changed bitmap pages, read errors may have just
1861 * set some bits outside the area covered by the activity log.
1863 * If we have an IO error during the bitmap writeout,
1864 * we will want a full sync next time, just in case.
1865 * (Do we want a specific meta data flag for this?)
1867 * If that does not make it to stable storage either,
1868 * we cannot do anything about that anymore.
1870 * We still need to check if both bitmap and ldev are present, we may
1871 * end up here after a failed attach, before ldev was even assigned.
1873 if (device->bitmap && device->ldev) {
1874 /* An interrupted resync or similar is allowed to recounts bits
1876 * Any modifications would not be expected anymore, though.
1878 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1879 "detach", BM_LOCKED_TEST_ALLOWED)) {
1880 if (test_bit(WAS_READ_ERROR, &device->flags)) {
1881 drbd_md_set_flag(device, MDF_FULL_SYNC);
1882 drbd_md_sync(device);
1887 drbd_force_state(device, NS(disk, D_DISKLESS));
1890 static int do_md_sync(struct drbd_device *device)
1892 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1893 drbd_md_sync(device);
1897 /* only called from drbd_worker thread, no locking */
1898 void __update_timing_details(
1899 struct drbd_thread_timing_details *tdp,
1900 unsigned int *cb_nr,
1902 const char *fn, const unsigned int line)
1904 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1905 struct drbd_thread_timing_details *td = tdp + i;
1907 td->start_jif = jiffies;
1913 i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1915 memset(td, 0, sizeof(*td));
1920 static void do_device_work(struct drbd_device *device, const unsigned long todo)
1922 if (test_bit(MD_SYNC, &todo))
1924 if (test_bit(RS_DONE, &todo) ||
1925 test_bit(RS_PROGRESS, &todo))
1926 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1927 if (test_bit(GO_DISKLESS, &todo))
1928 go_diskless(device);
1929 if (test_bit(DESTROY_DISK, &todo))
1930 drbd_ldev_destroy(device);
1931 if (test_bit(RS_START, &todo))
1932 do_start_resync(device);
1935 #define DRBD_DEVICE_WORK_MASK \
1936 ((1UL << GO_DISKLESS) \
1937 |(1UL << DESTROY_DISK) \
1939 |(1UL << RS_START) \
1940 |(1UL << RS_PROGRESS) \
1944 static unsigned long get_work_bits(unsigned long *flags)
1946 unsigned long old, new;
1949 new = old & ~DRBD_DEVICE_WORK_MASK;
1950 } while (cmpxchg(flags, old, new) != old);
1951 return old & DRBD_DEVICE_WORK_MASK;
1954 static void do_unqueued_work(struct drbd_connection *connection)
1956 struct drbd_peer_device *peer_device;
1960 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1961 struct drbd_device *device = peer_device->device;
1962 unsigned long todo = get_work_bits(&device->flags);
1966 kref_get(&device->kref);
1968 do_device_work(device, todo);
1969 kref_put(&device->kref, drbd_destroy_device);
1975 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1977 spin_lock_irq(&queue->q_lock);
1978 list_splice_tail_init(&queue->q, work_list);
1979 spin_unlock_irq(&queue->q_lock);
1980 return !list_empty(work_list);
1983 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1986 struct net_conf *nc;
1989 dequeue_work_batch(&connection->sender_work, work_list);
1990 if (!list_empty(work_list))
1993 /* Still nothing to do?
1994 * Maybe we still need to close the current epoch,
1995 * even if no new requests are queued yet.
1997 * Also, poke TCP, just in case.
1998 * Then wait for new work (or signal). */
2000 nc = rcu_dereference(connection->net_conf);
2001 uncork = nc ? nc->tcp_cork : 0;
2004 mutex_lock(&connection->data.mutex);
2005 if (connection->data.socket)
2006 drbd_tcp_uncork(connection->data.socket);
2007 mutex_unlock(&connection->data.mutex);
2012 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2013 spin_lock_irq(&connection->resource->req_lock);
2014 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2015 if (!list_empty(&connection->sender_work.q))
2016 list_splice_tail_init(&connection->sender_work.q, work_list);
2017 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2018 if (!list_empty(work_list) || signal_pending(current)) {
2019 spin_unlock_irq(&connection->resource->req_lock);
2023 /* We found nothing new to do, no to-be-communicated request,
2024 * no other work item. We may still need to close the last
2025 * epoch. Next incoming request epoch will be connection ->
2026 * current transfer log epoch number. If that is different
2027 * from the epoch of the last request we communicated, it is
2028 * safe to send the epoch separating barrier now.
2031 atomic_read(&connection->current_tle_nr) !=
2032 connection->send.current_epoch_nr;
2033 spin_unlock_irq(&connection->resource->req_lock);
2036 maybe_send_barrier(connection,
2037 connection->send.current_epoch_nr + 1);
2039 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2042 /* drbd_send() may have called flush_signals() */
2043 if (get_t_state(&connection->worker) != RUNNING)
2047 /* may be woken up for other things but new work, too,
2048 * e.g. if the current epoch got closed.
2049 * In which case we send the barrier above. */
2051 finish_wait(&connection->sender_work.q_wait, &wait);
2053 /* someone may have changed the config while we have been waiting above. */
2055 nc = rcu_dereference(connection->net_conf);
2056 cork = nc ? nc->tcp_cork : 0;
2058 mutex_lock(&connection->data.mutex);
2059 if (connection->data.socket) {
2061 drbd_tcp_cork(connection->data.socket);
2063 drbd_tcp_uncork(connection->data.socket);
2065 mutex_unlock(&connection->data.mutex);
2068 int drbd_worker(struct drbd_thread *thi)
2070 struct drbd_connection *connection = thi->connection;
2071 struct drbd_work *w = NULL;
2072 struct drbd_peer_device *peer_device;
2073 LIST_HEAD(work_list);
2076 while (get_t_state(thi) == RUNNING) {
2077 drbd_thread_current_set_cpu(thi);
2079 if (list_empty(&work_list)) {
2080 update_worker_timing_details(connection, wait_for_work);
2081 wait_for_work(connection, &work_list);
2084 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2085 update_worker_timing_details(connection, do_unqueued_work);
2086 do_unqueued_work(connection);
2089 if (signal_pending(current)) {
2090 flush_signals(current);
2091 if (get_t_state(thi) == RUNNING) {
2092 drbd_warn(connection, "Worker got an unexpected signal\n");
2098 if (get_t_state(thi) != RUNNING)
2101 if (!list_empty(&work_list)) {
2102 w = list_first_entry(&work_list, struct drbd_work, list);
2103 list_del_init(&w->list);
2104 update_worker_timing_details(connection, w->cb);
2105 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2107 if (connection->cstate >= C_WF_REPORT_PARAMS)
2108 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2113 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2114 update_worker_timing_details(connection, do_unqueued_work);
2115 do_unqueued_work(connection);
2117 if (!list_empty(&work_list)) {
2118 w = list_first_entry(&work_list, struct drbd_work, list);
2119 list_del_init(&w->list);
2120 update_worker_timing_details(connection, w->cb);
2123 dequeue_work_batch(&connection->sender_work, &work_list);
2124 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2127 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2128 struct drbd_device *device = peer_device->device;
2129 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2130 kref_get(&device->kref);
2132 drbd_device_cleanup(device);
2133 kref_put(&device->kref, drbd_destroy_device);