4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_work *w, int cancel);
47 * drbd_md_io_complete (defined here)
48 * drbd_endio_pri (defined here)
49 * drbd_endio_sec (defined here)
50 * bm_async_io_complete (defined in drbd_bitmap.c)
52 * For all these callbacks, note the following:
53 * The callbacks will be called in irq context by the IDE drivers,
54 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
55 * Try to get the locking right :)
60 /* About the global_state_lock
61 Each state transition on an device holds a read lock. In case we have
62 to evaluate the sync after dependencies, we grab a write lock, because
63 we need stable states on all devices for that. */
64 rwlock_t global_state_lock;
66 /* used for synchronous meta data and bitmap IO
67 * submitted by drbd_md_sync_page_io()
69 void drbd_md_io_complete(struct bio *bio, int error)
71 struct drbd_md_io *md_io;
73 md_io = (struct drbd_md_io *)bio->bi_private;
76 complete(&md_io->event);
79 /* reads on behalf of the partner,
80 * "submitted" by the receiver
82 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
84 unsigned long flags = 0;
85 struct drbd_conf *mdev = peer_req->w.mdev;
87 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
88 mdev->read_cnt += peer_req->i.size >> 9;
89 list_del(&peer_req->w.list);
90 if (list_empty(&mdev->read_ee))
91 wake_up(&mdev->ee_wait);
92 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
93 __drbd_chk_io_error(mdev, false);
94 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
96 drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
100 /* writes on behalf of the partner, or resync writes,
101 * "submitted" by the receiver, final stage. */
102 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
104 unsigned long flags = 0;
105 struct drbd_conf *mdev = peer_req->w.mdev;
109 int do_al_complete_io;
111 /* after we moved peer_req to done_ee,
112 * we may no longer access it,
113 * it may be freed/reused already!
114 * (as soon as we release the req_lock) */
115 e_sector = peer_req->i.sector;
116 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
117 block_id = peer_req->block_id;
119 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
120 mdev->writ_cnt += peer_req->i.size >> 9;
121 list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
122 list_add_tail(&peer_req->w.list, &mdev->done_ee);
125 * Do not remove from the write_requests tree here: we did not send the
126 * Ack yet and did not wake possibly waiting conflicting requests.
127 * Removed from the tree from "drbd_process_done_ee" within the
128 * appropriate w.cb (e_end_block/e_end_resync_block) or from
129 * _drbd_clear_done_ee.
132 do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
134 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
135 __drbd_chk_io_error(mdev, false);
136 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
138 if (block_id == ID_SYNCER)
139 drbd_rs_complete_io(mdev, e_sector);
142 wake_up(&mdev->ee_wait);
144 if (do_al_complete_io)
145 drbd_al_complete_io(mdev, e_sector);
147 wake_asender(mdev->tconn);
151 /* writes on behalf of the partner, or resync writes,
152 * "submitted" by the receiver.
154 void drbd_endio_sec(struct bio *bio, int error)
156 struct drbd_peer_request *peer_req = bio->bi_private;
157 struct drbd_conf *mdev = peer_req->w.mdev;
158 int uptodate = bio_flagged(bio, BIO_UPTODATE);
159 int is_write = bio_data_dir(bio) == WRITE;
161 if (error && __ratelimit(&drbd_ratelimit_state))
162 dev_warn(DEV, "%s: error=%d s=%llus\n",
163 is_write ? "write" : "read", error,
164 (unsigned long long)peer_req->i.sector);
165 if (!error && !uptodate) {
166 if (__ratelimit(&drbd_ratelimit_state))
167 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
168 is_write ? "write" : "read",
169 (unsigned long long)peer_req->i.sector);
170 /* strange behavior of some lower level drivers...
171 * fail the request by clearing the uptodate flag,
172 * but do not return any error?! */
177 set_bit(__EE_WAS_ERROR, &peer_req->flags);
179 bio_put(bio); /* no need for the bio anymore */
180 if (atomic_dec_and_test(&peer_req->pending_bios)) {
182 drbd_endio_write_sec_final(peer_req);
184 drbd_endio_read_sec_final(peer_req);
188 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
190 void drbd_endio_pri(struct bio *bio, int error)
193 struct drbd_request *req = bio->bi_private;
194 struct drbd_conf *mdev = req->w.mdev;
195 struct bio_and_error m;
196 enum drbd_req_event what;
197 int uptodate = bio_flagged(bio, BIO_UPTODATE);
199 if (!error && !uptodate) {
200 dev_warn(DEV, "p %s: setting error to -EIO\n",
201 bio_data_dir(bio) == WRITE ? "write" : "read");
202 /* strange behavior of some lower level drivers...
203 * fail the request by clearing the uptodate flag,
204 * but do not return any error?! */
208 /* to avoid recursion in __req_mod */
209 if (unlikely(error)) {
210 what = (bio_data_dir(bio) == WRITE)
211 ? WRITE_COMPLETED_WITH_ERROR
212 : (bio_rw(bio) == READ)
213 ? READ_COMPLETED_WITH_ERROR
214 : READ_AHEAD_COMPLETED_WITH_ERROR;
218 bio_put(req->private_bio);
219 req->private_bio = ERR_PTR(error);
221 /* not req_mod(), we need irqsave here! */
222 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
223 __req_mod(req, what, &m);
224 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
227 complete_master_bio(mdev, &m);
230 int w_read_retry_remote(struct drbd_work *w, int cancel)
232 struct drbd_request *req = container_of(w, struct drbd_request, w);
233 struct drbd_conf *mdev = w->mdev;
235 /* We should not detach for read io-error,
236 * but try to WRITE the P_DATA_REPLY to the failed location,
237 * to give the disk the chance to relocate that block */
239 spin_lock_irq(&mdev->tconn->req_lock);
240 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
241 _req_mod(req, READ_RETRY_REMOTE_CANCELED);
242 spin_unlock_irq(&mdev->tconn->req_lock);
245 spin_unlock_irq(&mdev->tconn->req_lock);
247 return w_send_read_req(w, 0);
250 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
251 struct drbd_peer_request *peer_req, void *digest)
253 struct hash_desc desc;
254 struct scatterlist sg;
255 struct page *page = peer_req->pages;
262 sg_init_table(&sg, 1);
263 crypto_hash_init(&desc);
265 while ((tmp = page_chain_next(page))) {
266 /* all but the last page will be fully used */
267 sg_set_page(&sg, page, PAGE_SIZE, 0);
268 crypto_hash_update(&desc, &sg, sg.length);
271 /* and now the last, possibly only partially used page */
272 len = peer_req->i.size & (PAGE_SIZE - 1);
273 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
274 crypto_hash_update(&desc, &sg, sg.length);
275 crypto_hash_final(&desc, digest);
278 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
280 struct hash_desc desc;
281 struct scatterlist sg;
282 struct bio_vec *bvec;
288 sg_init_table(&sg, 1);
289 crypto_hash_init(&desc);
291 __bio_for_each_segment(bvec, bio, i, 0) {
292 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
293 crypto_hash_update(&desc, &sg, sg.length);
295 crypto_hash_final(&desc, digest);
298 static int w_e_send_csum(struct drbd_work *w, int cancel)
300 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
301 struct drbd_conf *mdev = w->mdev;
306 if (unlikely(cancel))
309 if (likely((peer_req->flags & EE_WAS_ERROR) != 0))
312 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
313 digest = kmalloc(digest_size, GFP_NOIO);
315 sector_t sector = peer_req->i.sector;
316 unsigned int size = peer_req->i.size;
317 drbd_csum_ee(mdev, mdev->csums_tfm, peer_req, digest);
318 /* Free e and pages before send.
319 * In case we block on congestion, we could otherwise run into
320 * some distributed deadlock, if the other side blocks on
321 * congestion as well, because our receiver blocks in
322 * drbd_pp_alloc due to pp_in_use > max_buffers. */
323 drbd_free_ee(mdev, peer_req);
325 inc_rs_pending(mdev);
326 ok = drbd_send_drequest_csum(mdev, sector, size,
331 dev_err(DEV, "kmalloc() of digest failed.\n");
337 drbd_free_ee(mdev, peer_req);
340 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
344 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
346 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
348 struct drbd_peer_request *peer_req;
353 if (drbd_rs_should_slow_down(mdev, sector))
356 /* GFP_TRY, because if there is no memory available right now, this may
357 * be rescheduled for later. It is "only" background resync, after all. */
358 peer_req = drbd_alloc_ee(mdev, ID_SYNCER /* unused */, sector, size, GFP_TRY);
362 peer_req->w.cb = w_e_send_csum;
363 spin_lock_irq(&mdev->tconn->req_lock);
364 list_add(&peer_req->w.list, &mdev->read_ee);
365 spin_unlock_irq(&mdev->tconn->req_lock);
367 atomic_add(size >> 9, &mdev->rs_sect_ev);
368 if (drbd_submit_ee(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
371 /* If it failed because of ENOMEM, retry should help. If it failed
372 * because bio_add_page failed (probably broken lower level driver),
373 * retry may or may not help.
374 * If it does not, you may need to force disconnect. */
375 spin_lock_irq(&mdev->tconn->req_lock);
376 list_del(&peer_req->w.list);
377 spin_unlock_irq(&mdev->tconn->req_lock);
379 drbd_free_ee(mdev, peer_req);
385 int w_resync_timer(struct drbd_work *w, int cancel)
387 struct drbd_conf *mdev = w->mdev;
388 switch (mdev->state.conn) {
390 w_make_ov_request(w, cancel);
393 w_make_resync_request(w, cancel);
400 void resync_timer_fn(unsigned long data)
402 struct drbd_conf *mdev = (struct drbd_conf *) data;
404 if (list_empty(&mdev->resync_work.list))
405 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
408 static void fifo_set(struct fifo_buffer *fb, int value)
412 for (i = 0; i < fb->size; i++)
413 fb->values[i] = value;
416 static int fifo_push(struct fifo_buffer *fb, int value)
420 ov = fb->values[fb->head_index];
421 fb->values[fb->head_index++] = value;
423 if (fb->head_index >= fb->size)
429 static void fifo_add_val(struct fifo_buffer *fb, int value)
433 for (i = 0; i < fb->size; i++)
434 fb->values[i] += value;
437 static int drbd_rs_controller(struct drbd_conf *mdev)
439 unsigned int sect_in; /* Number of sectors that came in since the last turn */
440 unsigned int want; /* The number of sectors we want in the proxy */
441 int req_sect; /* Number of sectors to request in this turn */
442 int correction; /* Number of sectors more we need in the proxy*/
443 int cps; /* correction per invocation of drbd_rs_controller() */
444 int steps; /* Number of time steps to plan ahead */
448 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
449 mdev->rs_in_flight -= sect_in;
451 spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
453 steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
455 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
456 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
457 } else { /* normal path */
458 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
459 sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
462 correction = want - mdev->rs_in_flight - mdev->rs_planed;
465 cps = correction / steps;
466 fifo_add_val(&mdev->rs_plan_s, cps);
467 mdev->rs_planed += cps * steps;
469 /* What we do in this step */
470 curr_corr = fifo_push(&mdev->rs_plan_s, 0);
471 spin_unlock(&mdev->peer_seq_lock);
472 mdev->rs_planed -= curr_corr;
474 req_sect = sect_in + curr_corr;
478 max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
479 if (req_sect > max_sect)
483 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
484 sect_in, mdev->rs_in_flight, want, correction,
485 steps, cps, mdev->rs_planed, curr_corr, req_sect);
491 static int drbd_rs_number_requests(struct drbd_conf *mdev)
494 if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
495 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
496 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
498 mdev->c_sync_rate = mdev->sync_conf.rate;
499 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
502 /* ignore the amount of pending requests, the resync controller should
503 * throttle down to incoming reply rate soon enough anyways. */
507 static int w_make_resync_request(struct drbd_work *w, int cancel)
509 struct drbd_conf *mdev = w->mdev;
512 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
514 int number, rollback_i, size;
515 int align, queued, sndbuf;
518 if (unlikely(cancel))
521 if (mdev->rs_total == 0) {
523 drbd_resync_finished(mdev);
527 if (!get_ldev(mdev)) {
528 /* Since we only need to access mdev->rsync a
529 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
530 to continue resync with a broken disk makes no sense at
532 dev_err(DEV, "Disk broke down during resync!\n");
536 max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
537 number = drbd_rs_number_requests(mdev);
541 for (i = 0; i < number; i++) {
542 /* Stop generating RS requests, when half of the send buffer is filled */
543 mutex_lock(&mdev->tconn->data.mutex);
544 if (mdev->tconn->data.socket) {
545 queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
546 sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
551 mutex_unlock(&mdev->tconn->data.mutex);
552 if (queued > sndbuf / 2)
556 size = BM_BLOCK_SIZE;
557 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
559 if (bit == DRBD_END_OF_BITMAP) {
560 mdev->bm_resync_fo = drbd_bm_bits(mdev);
565 sector = BM_BIT_TO_SECT(bit);
567 if (drbd_rs_should_slow_down(mdev, sector) ||
568 drbd_try_rs_begin_io(mdev, sector)) {
569 mdev->bm_resync_fo = bit;
572 mdev->bm_resync_fo = bit + 1;
574 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
575 drbd_rs_complete_io(mdev, sector);
579 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
580 /* try to find some adjacent bits.
581 * we stop if we have already the maximum req size.
583 * Additionally always align bigger requests, in order to
584 * be prepared for all stripe sizes of software RAIDs.
589 if (size + BM_BLOCK_SIZE > max_bio_size)
592 /* Be always aligned */
593 if (sector & ((1<<(align+3))-1))
596 /* do not cross extent boundaries */
597 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
599 /* now, is it actually dirty, after all?
600 * caution, drbd_bm_test_bit is tri-state for some
601 * obscure reason; ( b == 0 ) would get the out-of-band
602 * only accidentally right because of the "oddly sized"
603 * adjustment below */
604 if (drbd_bm_test_bit(mdev, bit+1) != 1)
607 size += BM_BLOCK_SIZE;
608 if ((BM_BLOCK_SIZE << align) <= size)
612 /* if we merged some,
613 * reset the offset to start the next drbd_bm_find_next from */
614 if (size > BM_BLOCK_SIZE)
615 mdev->bm_resync_fo = bit + 1;
618 /* adjust very last sectors, in case we are oddly sized */
619 if (sector + (size>>9) > capacity)
620 size = (capacity-sector)<<9;
621 if (mdev->tconn->agreed_pro_version >= 89 && mdev->csums_tfm) {
622 switch (read_for_csum(mdev, sector, size)) {
623 case -EIO: /* Disk failure */
626 case -EAGAIN: /* allocation failed, or ldev busy */
627 drbd_rs_complete_io(mdev, sector);
628 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
638 inc_rs_pending(mdev);
639 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
640 sector, size, ID_SYNCER)) {
641 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
642 dec_rs_pending(mdev);
649 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
650 /* last syncer _request_ was sent,
651 * but the P_RS_DATA_REPLY not yet received. sync will end (and
652 * next sync group will resume), as soon as we receive the last
653 * resync data block, and the last bit is cleared.
654 * until then resync "work" is "inactive" ...
661 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
662 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
667 static int w_make_ov_request(struct drbd_work *w, int cancel)
669 struct drbd_conf *mdev = w->mdev;
672 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
674 if (unlikely(cancel))
677 number = drbd_rs_number_requests(mdev);
679 sector = mdev->ov_position;
680 for (i = 0; i < number; i++) {
681 if (sector >= capacity) {
685 size = BM_BLOCK_SIZE;
687 if (drbd_rs_should_slow_down(mdev, sector) ||
688 drbd_try_rs_begin_io(mdev, sector)) {
689 mdev->ov_position = sector;
693 if (sector + (size>>9) > capacity)
694 size = (capacity-sector)<<9;
696 inc_rs_pending(mdev);
697 if (!drbd_send_ov_request(mdev, sector, size)) {
698 dec_rs_pending(mdev);
701 sector += BM_SECT_PER_BIT;
703 mdev->ov_position = sector;
706 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
707 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
711 int w_ov_finished(struct drbd_work *w, int cancel)
713 struct drbd_conf *mdev = w->mdev;
716 drbd_resync_finished(mdev);
721 static int w_resync_finished(struct drbd_work *w, int cancel)
723 struct drbd_conf *mdev = w->mdev;
726 drbd_resync_finished(mdev);
731 static void ping_peer(struct drbd_conf *mdev)
733 struct drbd_tconn *tconn = mdev->tconn;
735 clear_bit(GOT_PING_ACK, &tconn->flags);
737 wait_event(tconn->ping_wait,
738 test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
741 int drbd_resync_finished(struct drbd_conf *mdev)
743 unsigned long db, dt, dbdt;
745 union drbd_state os, ns;
747 char *khelper_cmd = NULL;
750 /* Remove all elements from the resync LRU. Since future actions
751 * might set bits in the (main) bitmap, then the entries in the
752 * resync LRU would be wrong. */
753 if (drbd_rs_del_all(mdev)) {
754 /* In case this is not possible now, most probably because
755 * there are P_RS_DATA_REPLY Packets lingering on the worker's
756 * queue (or even the read operations for those packets
757 * is not finished by now). Retry in 100ms. */
759 schedule_timeout_interruptible(HZ / 10);
760 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
762 w->cb = w_resync_finished;
763 drbd_queue_work(&mdev->tconn->data.work, w);
766 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
769 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
773 dbdt = Bit2KB(db/dt);
774 mdev->rs_paused /= HZ;
781 spin_lock_irq(&mdev->tconn->req_lock);
784 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
786 /* This protects us against multiple calls (that can happen in the presence
787 of application IO), and against connectivity loss just before we arrive here. */
788 if (os.conn <= C_CONNECTED)
792 ns.conn = C_CONNECTED;
794 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
795 verify_done ? "Online verify " : "Resync",
796 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
798 n_oos = drbd_bm_total_weight(mdev);
800 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
802 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
804 khelper_cmd = "out-of-sync";
807 D_ASSERT((n_oos - mdev->rs_failed) == 0);
809 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
810 khelper_cmd = "after-resync-target";
812 if (mdev->csums_tfm && mdev->rs_total) {
813 const unsigned long s = mdev->rs_same_csum;
814 const unsigned long t = mdev->rs_total;
817 (t < 100000) ? ((s*100)/t) : (s/(t/100));
818 dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
819 "transferred %luK total %luK\n",
821 Bit2KB(mdev->rs_same_csum),
822 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
823 Bit2KB(mdev->rs_total));
827 if (mdev->rs_failed) {
828 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
830 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
831 ns.disk = D_INCONSISTENT;
832 ns.pdsk = D_UP_TO_DATE;
834 ns.disk = D_UP_TO_DATE;
835 ns.pdsk = D_INCONSISTENT;
838 ns.disk = D_UP_TO_DATE;
839 ns.pdsk = D_UP_TO_DATE;
841 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
844 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
845 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
846 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
847 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
849 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
853 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
854 /* for verify runs, we don't update uuids here,
855 * so there would be nothing to report. */
856 drbd_uuid_set_bm(mdev, 0UL);
857 drbd_print_uuids(mdev, "updated UUIDs");
859 /* Now the two UUID sets are equal, update what we
860 * know of the peer. */
862 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
863 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
868 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
870 spin_unlock_irq(&mdev->tconn->req_lock);
877 mdev->ov_start_sector = 0;
882 drbd_khelper(mdev, khelper_cmd);
888 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
890 if (drbd_ee_has_active_page(peer_req)) {
891 /* This might happen if sendpage() has not finished */
892 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
893 atomic_add(i, &mdev->pp_in_use_by_net);
894 atomic_sub(i, &mdev->pp_in_use);
895 spin_lock_irq(&mdev->tconn->req_lock);
896 list_add_tail(&peer_req->w.list, &mdev->net_ee);
897 spin_unlock_irq(&mdev->tconn->req_lock);
898 wake_up(&drbd_pp_wait);
900 drbd_free_ee(mdev, peer_req);
904 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
905 * @mdev: DRBD device.
907 * @cancel: The connection will be closed anyways
909 int w_e_end_data_req(struct drbd_work *w, int cancel)
911 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
912 struct drbd_conf *mdev = w->mdev;
915 if (unlikely(cancel)) {
916 drbd_free_ee(mdev, peer_req);
921 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
922 ok = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
924 if (__ratelimit(&drbd_ratelimit_state))
925 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
926 (unsigned long long)peer_req->i.sector);
928 ok = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
933 move_to_net_ee_or_free(mdev, peer_req);
936 dev_err(DEV, "drbd_send_block() failed\n");
941 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
942 * @mdev: DRBD device.
944 * @cancel: The connection will be closed anyways
946 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
948 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
949 struct drbd_conf *mdev = w->mdev;
952 if (unlikely(cancel)) {
953 drbd_free_ee(mdev, peer_req);
958 if (get_ldev_if_state(mdev, D_FAILED)) {
959 drbd_rs_complete_io(mdev, peer_req->i.sector);
963 if (mdev->state.conn == C_AHEAD) {
964 ok = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
965 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
966 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
967 inc_rs_pending(mdev);
968 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
970 if (__ratelimit(&drbd_ratelimit_state))
971 dev_err(DEV, "Not sending RSDataReply, "
972 "partner DISKLESS!\n");
976 if (__ratelimit(&drbd_ratelimit_state))
977 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
978 (unsigned long long)peer_req->i.sector);
980 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
982 /* update resync data with failure */
983 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
988 move_to_net_ee_or_free(mdev, peer_req);
991 dev_err(DEV, "drbd_send_block() failed\n");
995 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
997 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
998 struct drbd_conf *mdev = w->mdev;
999 struct digest_info *di;
1001 void *digest = NULL;
1004 if (unlikely(cancel)) {
1005 drbd_free_ee(mdev, peer_req);
1010 if (get_ldev(mdev)) {
1011 drbd_rs_complete_io(mdev, peer_req->i.sector);
1015 di = peer_req->digest;
1017 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1018 /* quick hack to try to avoid a race against reconfiguration.
1019 * a real fix would be much more involved,
1020 * introducing more locking mechanisms */
1021 if (mdev->csums_tfm) {
1022 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1023 D_ASSERT(digest_size == di->digest_size);
1024 digest = kmalloc(digest_size, GFP_NOIO);
1027 drbd_csum_ee(mdev, mdev->csums_tfm, peer_req, digest);
1028 eq = !memcmp(digest, di->digest, digest_size);
1033 drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1034 /* rs_same_csums unit is BM_BLOCK_SIZE */
1035 mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1036 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1038 inc_rs_pending(mdev);
1039 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1040 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1042 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1045 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1046 if (__ratelimit(&drbd_ratelimit_state))
1047 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1051 move_to_net_ee_or_free(mdev, peer_req);
1054 dev_err(DEV, "drbd_send_block/ack() failed\n");
1058 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1060 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1061 struct drbd_conf *mdev = w->mdev;
1062 sector_t sector = peer_req->i.sector;
1063 unsigned int size = peer_req->i.size;
1068 if (unlikely(cancel))
1071 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1072 digest = kmalloc(digest_size, GFP_NOIO);
1074 ok = 0; /* terminate the connection in case the allocation failed */
1078 if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1079 drbd_csum_ee(mdev, mdev->verify_tfm, peer_req, digest);
1081 memset(digest, 0, digest_size);
1083 /* Free e and pages before send.
1084 * In case we block on congestion, we could otherwise run into
1085 * some distributed deadlock, if the other side blocks on
1086 * congestion as well, because our receiver blocks in
1087 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1088 drbd_free_ee(mdev, peer_req);
1090 inc_rs_pending(mdev);
1091 ok = drbd_send_drequest_csum(mdev, sector, size,
1092 digest, digest_size,
1095 dec_rs_pending(mdev);
1100 drbd_free_ee(mdev, peer_req);
1105 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1107 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1108 mdev->ov_last_oos_size += size>>9;
1110 mdev->ov_last_oos_start = sector;
1111 mdev->ov_last_oos_size = size>>9;
1113 drbd_set_out_of_sync(mdev, sector, size);
1116 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1118 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1119 struct drbd_conf *mdev = w->mdev;
1120 struct digest_info *di;
1122 sector_t sector = peer_req->i.sector;
1123 unsigned int size = peer_req->i.size;
1127 if (unlikely(cancel)) {
1128 drbd_free_ee(mdev, peer_req);
1133 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1134 * the resync lru has been cleaned up already */
1135 if (get_ldev(mdev)) {
1136 drbd_rs_complete_io(mdev, peer_req->i.sector);
1140 di = peer_req->digest;
1142 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1143 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1144 digest = kmalloc(digest_size, GFP_NOIO);
1146 drbd_csum_ee(mdev, mdev->verify_tfm, peer_req, digest);
1148 D_ASSERT(digest_size == di->digest_size);
1149 eq = !memcmp(digest, di->digest, digest_size);
1154 /* Free e and pages before send.
1155 * In case we block on congestion, we could otherwise run into
1156 * some distributed deadlock, if the other side blocks on
1157 * congestion as well, because our receiver blocks in
1158 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1159 drbd_free_ee(mdev, peer_req);
1161 drbd_ov_oos_found(mdev, sector, size);
1165 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1166 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1172 /* let's advance progress step marks only for every other megabyte */
1173 if ((mdev->ov_left & 0x200) == 0x200)
1174 drbd_advance_rs_marks(mdev, mdev->ov_left);
1176 if (mdev->ov_left == 0) {
1178 drbd_resync_finished(mdev);
1184 int w_prev_work_done(struct drbd_work *w, int cancel)
1186 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1192 int w_send_barrier(struct drbd_work *w, int cancel)
1194 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1195 struct drbd_conf *mdev = w->mdev;
1196 struct p_barrier *p = &mdev->tconn->data.sbuf.barrier;
1199 /* really avoid racing with tl_clear. w.cb may have been referenced
1200 * just before it was reassigned and re-queued, so double check that.
1201 * actually, this race was harmless, since we only try to send the
1202 * barrier packet here, and otherwise do nothing with the object.
1203 * but compare with the head of w_clear_epoch */
1204 spin_lock_irq(&mdev->tconn->req_lock);
1205 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1207 spin_unlock_irq(&mdev->tconn->req_lock);
1211 if (!drbd_get_data_sock(mdev->tconn))
1213 p->barrier = b->br_number;
1214 /* inc_ap_pending was done where this was queued.
1215 * dec_ap_pending will be done in got_BarrierAck
1216 * or (on connection loss) in w_clear_epoch. */
1217 ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BARRIER,
1218 &p->head, sizeof(*p), 0);
1219 drbd_put_data_sock(mdev->tconn);
1224 int w_send_write_hint(struct drbd_work *w, int cancel)
1226 struct drbd_conf *mdev = w->mdev;
1229 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1232 int w_send_oos(struct drbd_work *w, int cancel)
1234 struct drbd_request *req = container_of(w, struct drbd_request, w);
1235 struct drbd_conf *mdev = w->mdev;
1238 if (unlikely(cancel)) {
1239 req_mod(req, SEND_CANCELED);
1243 ok = drbd_send_oos(mdev, req);
1244 req_mod(req, OOS_HANDED_TO_NETWORK);
1250 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1251 * @mdev: DRBD device.
1253 * @cancel: The connection will be closed anyways
1255 int w_send_dblock(struct drbd_work *w, int cancel)
1257 struct drbd_request *req = container_of(w, struct drbd_request, w);
1258 struct drbd_conf *mdev = w->mdev;
1261 if (unlikely(cancel)) {
1262 req_mod(req, SEND_CANCELED);
1266 ok = drbd_send_dblock(mdev, req);
1267 req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1273 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1274 * @mdev: DRBD device.
1276 * @cancel: The connection will be closed anyways
1278 int w_send_read_req(struct drbd_work *w, int cancel)
1280 struct drbd_request *req = container_of(w, struct drbd_request, w);
1281 struct drbd_conf *mdev = w->mdev;
1284 if (unlikely(cancel)) {
1285 req_mod(req, SEND_CANCELED);
1289 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1290 (unsigned long)req);
1293 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1294 * so this is probably redundant */
1295 if (mdev->state.conn >= C_CONNECTED)
1296 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1298 req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1303 int w_restart_disk_io(struct drbd_work *w, int cancel)
1305 struct drbd_request *req = container_of(w, struct drbd_request, w);
1306 struct drbd_conf *mdev = w->mdev;
1308 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1309 drbd_al_begin_io(mdev, req->i.sector);
1310 /* Calling drbd_al_begin_io() out of the worker might deadlocks
1311 theoretically. Practically it can not deadlock, since this is
1312 only used when unfreezing IOs. All the extents of the requests
1313 that made it into the TL are already active */
1315 drbd_req_make_private_bio(req, req->master_bio);
1316 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1317 generic_make_request(req->private_bio);
1322 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1324 struct drbd_conf *odev = mdev;
1327 if (odev->sync_conf.after == -1)
1329 odev = minor_to_mdev(odev->sync_conf.after);
1332 if ((odev->state.conn >= C_SYNC_SOURCE &&
1333 odev->state.conn <= C_PAUSED_SYNC_T) ||
1334 odev->state.aftr_isp || odev->state.peer_isp ||
1335 odev->state.user_isp)
1341 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1342 * @mdev: DRBD device.
1344 * Called from process context only (admin command and after_state_ch).
1346 static int _drbd_pause_after(struct drbd_conf *mdev)
1348 struct drbd_conf *odev;
1351 for (i = 0; i < minor_count; i++) {
1352 odev = minor_to_mdev(i);
1355 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1357 if (!_drbd_may_sync_now(odev))
1358 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1359 != SS_NOTHING_TO_DO);
1366 * _drbd_resume_next() - Resume resync on all devices that may resync now
1367 * @mdev: DRBD device.
1369 * Called from process context only (admin command and worker).
1371 static int _drbd_resume_next(struct drbd_conf *mdev)
1373 struct drbd_conf *odev;
1376 for (i = 0; i < minor_count; i++) {
1377 odev = minor_to_mdev(i);
1380 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1382 if (odev->state.aftr_isp) {
1383 if (_drbd_may_sync_now(odev))
1384 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1386 != SS_NOTHING_TO_DO) ;
1392 void resume_next_sg(struct drbd_conf *mdev)
1394 write_lock_irq(&global_state_lock);
1395 _drbd_resume_next(mdev);
1396 write_unlock_irq(&global_state_lock);
1399 void suspend_other_sg(struct drbd_conf *mdev)
1401 write_lock_irq(&global_state_lock);
1402 _drbd_pause_after(mdev);
1403 write_unlock_irq(&global_state_lock);
1406 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1408 struct drbd_conf *odev;
1412 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1413 return ERR_SYNC_AFTER;
1415 /* check for loops */
1416 odev = minor_to_mdev(o_minor);
1419 return ERR_SYNC_AFTER_CYCLE;
1421 /* dependency chain ends here, no cycles. */
1422 if (odev->sync_conf.after == -1)
1425 /* follow the dependency chain */
1426 odev = minor_to_mdev(odev->sync_conf.after);
1430 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1435 write_lock_irq(&global_state_lock);
1436 retcode = sync_after_error(mdev, na);
1437 if (retcode == NO_ERROR) {
1438 mdev->sync_conf.after = na;
1440 changes = _drbd_pause_after(mdev);
1441 changes |= _drbd_resume_next(mdev);
1444 write_unlock_irq(&global_state_lock);
1448 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1450 atomic_set(&mdev->rs_sect_in, 0);
1451 atomic_set(&mdev->rs_sect_ev, 0);
1452 mdev->rs_in_flight = 0;
1453 mdev->rs_planed = 0;
1454 spin_lock(&mdev->peer_seq_lock);
1455 fifo_set(&mdev->rs_plan_s, 0);
1456 spin_unlock(&mdev->peer_seq_lock);
1459 void start_resync_timer_fn(unsigned long data)
1461 struct drbd_conf *mdev = (struct drbd_conf *) data;
1463 drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1466 int w_start_resync(struct drbd_work *w, int cancel)
1468 struct drbd_conf *mdev = w->mdev;
1470 if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1471 dev_warn(DEV, "w_start_resync later...\n");
1472 mdev->start_resync_timer.expires = jiffies + HZ/10;
1473 add_timer(&mdev->start_resync_timer);
1477 drbd_start_resync(mdev, C_SYNC_SOURCE);
1478 clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1483 * drbd_start_resync() - Start the resync process
1484 * @mdev: DRBD device.
1485 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1487 * This function might bring you directly into one of the
1488 * C_PAUSED_SYNC_* states.
1490 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1492 union drbd_state ns;
1495 if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1496 dev_err(DEV, "Resync already running!\n");
1500 if (mdev->state.conn < C_AHEAD) {
1501 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1502 drbd_rs_cancel_all(mdev);
1503 /* This should be done when we abort the resync. We definitely do not
1504 want to have this for connections going back and forth between
1505 Ahead/Behind and SyncSource/SyncTarget */
1508 if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1509 if (side == C_SYNC_TARGET) {
1510 /* Since application IO was locked out during C_WF_BITMAP_T and
1511 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1512 we check that we might make the data inconsistent. */
1513 r = drbd_khelper(mdev, "before-resync-target");
1514 r = (r >> 8) & 0xff;
1516 dev_info(DEV, "before-resync-target handler returned %d, "
1517 "dropping connection.\n", r);
1518 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1521 } else /* C_SYNC_SOURCE */ {
1522 r = drbd_khelper(mdev, "before-resync-source");
1523 r = (r >> 8) & 0xff;
1526 dev_info(DEV, "before-resync-source handler returned %d, "
1527 "ignoring. Old userland tools?", r);
1529 dev_info(DEV, "before-resync-source handler returned %d, "
1530 "dropping connection.\n", r);
1531 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1538 if (current == mdev->tconn->worker.task) {
1539 /* The worker should not sleep waiting for drbd_state_lock(),
1540 that can take long */
1541 if (test_and_set_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
1542 set_bit(B_RS_H_DONE, &mdev->flags);
1543 mdev->start_resync_timer.expires = jiffies + HZ/5;
1544 add_timer(&mdev->start_resync_timer);
1548 drbd_state_lock(mdev);
1550 clear_bit(B_RS_H_DONE, &mdev->flags);
1552 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1553 drbd_state_unlock(mdev);
1557 write_lock_irq(&global_state_lock);
1560 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1564 if (side == C_SYNC_TARGET)
1565 ns.disk = D_INCONSISTENT;
1566 else /* side == C_SYNC_SOURCE */
1567 ns.pdsk = D_INCONSISTENT;
1569 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1572 if (ns.conn < C_CONNECTED)
1573 r = SS_UNKNOWN_ERROR;
1575 if (r == SS_SUCCESS) {
1576 unsigned long tw = drbd_bm_total_weight(mdev);
1577 unsigned long now = jiffies;
1580 mdev->rs_failed = 0;
1581 mdev->rs_paused = 0;
1582 mdev->rs_same_csum = 0;
1583 mdev->rs_last_events = 0;
1584 mdev->rs_last_sect_ev = 0;
1585 mdev->rs_total = tw;
1586 mdev->rs_start = now;
1587 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1588 mdev->rs_mark_left[i] = tw;
1589 mdev->rs_mark_time[i] = now;
1591 _drbd_pause_after(mdev);
1593 write_unlock_irq(&global_state_lock);
1595 if (r == SS_SUCCESS) {
1596 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1597 drbd_conn_str(ns.conn),
1598 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1599 (unsigned long) mdev->rs_total);
1600 if (side == C_SYNC_TARGET)
1601 mdev->bm_resync_fo = 0;
1603 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1604 * with w_send_oos, or the sync target will get confused as to
1605 * how much bits to resync. We cannot do that always, because for an
1606 * empty resync and protocol < 95, we need to do it here, as we call
1607 * drbd_resync_finished from here in that case.
1608 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1609 * and from after_state_ch otherwise. */
1610 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1611 drbd_gen_and_send_sync_uuid(mdev);
1613 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1614 /* This still has a race (about when exactly the peers
1615 * detect connection loss) that can lead to a full sync
1616 * on next handshake. In 8.3.9 we fixed this with explicit
1617 * resync-finished notifications, but the fix
1618 * introduces a protocol change. Sleeping for some
1619 * time longer than the ping interval + timeout on the
1620 * SyncSource, to give the SyncTarget the chance to
1621 * detect connection loss, then waiting for a ping
1622 * response (implicit in drbd_resync_finished) reduces
1623 * the race considerably, but does not solve it. */
1624 if (side == C_SYNC_SOURCE)
1625 schedule_timeout_interruptible(
1626 mdev->tconn->net_conf->ping_int * HZ +
1627 mdev->tconn->net_conf->ping_timeo*HZ/9);
1628 drbd_resync_finished(mdev);
1631 drbd_rs_controller_reset(mdev);
1632 /* ns.conn may already be != mdev->state.conn,
1633 * we may have been paused in between, or become paused until
1634 * the timer triggers.
1635 * No matter, that is handled in resync_timer_fn() */
1636 if (ns.conn == C_SYNC_TARGET)
1637 mod_timer(&mdev->resync_timer, jiffies);
1642 drbd_state_unlock(mdev);
1645 static int _worker_dying(int vnr, void *p, void *data)
1647 struct drbd_conf *mdev = (struct drbd_conf *)p;
1649 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1650 /* _drbd_set_state only uses stop_nowait.
1651 * wait here for the exiting receiver. */
1652 drbd_thread_stop(&mdev->tconn->receiver);
1653 drbd_mdev_cleanup(mdev);
1655 clear_bit(DEVICE_DYING, &mdev->flags);
1656 clear_bit(CONFIG_PENDING, &mdev->flags);
1657 wake_up(&mdev->state_wait);
1662 int drbd_worker(struct drbd_thread *thi)
1664 struct drbd_tconn *tconn = thi->tconn;
1665 struct drbd_work *w = NULL;
1666 LIST_HEAD(work_list);
1669 while (get_t_state(thi) == RUNNING) {
1670 drbd_thread_current_set_cpu(thi);
1672 if (down_trylock(&tconn->data.work.s)) {
1673 mutex_lock(&tconn->data.mutex);
1674 if (tconn->data.socket && !tconn->net_conf->no_cork)
1675 drbd_tcp_uncork(tconn->data.socket);
1676 mutex_unlock(&tconn->data.mutex);
1678 intr = down_interruptible(&tconn->data.work.s);
1680 mutex_lock(&tconn->data.mutex);
1681 if (tconn->data.socket && !tconn->net_conf->no_cork)
1682 drbd_tcp_cork(tconn->data.socket);
1683 mutex_unlock(&tconn->data.mutex);
1687 flush_signals(current);
1688 if (get_t_state(thi) == RUNNING) {
1689 conn_warn(tconn, "Worker got an unexpected signal\n");
1695 if (get_t_state(thi) != RUNNING)
1697 /* With this break, we have done a down() but not consumed
1698 the entry from the list. The cleanup code takes care of
1702 spin_lock_irq(&tconn->data.work.q_lock);
1703 if (list_empty(&tconn->data.work.q)) {
1704 /* something terribly wrong in our logic.
1705 * we were able to down() the semaphore,
1706 * but the list is empty... doh.
1708 * what is the best thing to do now?
1709 * try again from scratch, restarting the receiver,
1710 * asender, whatnot? could break even more ugly,
1711 * e.g. when we are primary, but no good local data.
1713 * I'll try to get away just starting over this loop.
1715 conn_warn(tconn, "Work list unexpectedly empty\n");
1716 spin_unlock_irq(&tconn->data.work.q_lock);
1719 w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1720 list_del_init(&w->list);
1721 spin_unlock_irq(&tconn->data.work.q_lock);
1723 if (!w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1724 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1725 if (tconn->cstate >= C_WF_REPORT_PARAMS)
1726 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1730 spin_lock_irq(&tconn->data.work.q_lock);
1731 while (!list_empty(&tconn->data.work.q)) {
1732 list_splice_init(&tconn->data.work.q, &work_list);
1733 spin_unlock_irq(&tconn->data.work.q_lock);
1735 while (!list_empty(&work_list)) {
1736 w = list_entry(work_list.next, struct drbd_work, list);
1737 list_del_init(&w->list);
1741 spin_lock_irq(&tconn->data.work.q_lock);
1743 sema_init(&tconn->data.work.s, 0);
1744 /* DANGEROUS race: if someone did queue his work within the spinlock,
1745 * but up() ed outside the spinlock, we could get an up() on the
1746 * semaphore without corresponding list entry.
1749 spin_unlock_irq(&tconn->data.work.q_lock);
1751 idr_for_each(&tconn->volumes, _worker_dying, NULL);