4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
42 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
51 * more endio handlers:
52 atodb_endio in drbd_actlog.c
53 drbd_bm_async_io_complete in drbd_bitmap.c
55 * For all these callbacks, note the following:
56 * The callbacks will be called in irq context by the IDE drivers,
57 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58 * Try to get the locking right :)
63 /* About the global_state_lock
64 Each state transition on an device holds a read lock. In case we have
65 to evaluate the sync after dependencies, we grab a write lock, because
66 we need stable states on all devices for that. */
67 rwlock_t global_state_lock;
69 /* used for synchronous meta data and bitmap IO
70 * submitted by drbd_md_sync_page_io()
72 void drbd_md_io_complete(struct bio *bio, int error)
74 struct drbd_md_io *md_io;
76 md_io = (struct drbd_md_io *)bio->bi_private;
79 complete(&md_io->event);
82 /* reads on behalf of the partner,
83 * "submitted" by the receiver
85 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
87 unsigned long flags = 0;
88 struct drbd_conf *mdev = e->mdev;
90 D_ASSERT(e->block_id != ID_VACANT);
92 spin_lock_irqsave(&mdev->req_lock, flags);
93 mdev->read_cnt += e->size >> 9;
95 if (list_empty(&mdev->read_ee))
96 wake_up(&mdev->ee_wait);
97 if (test_bit(__EE_WAS_ERROR, &e->flags))
98 __drbd_chk_io_error(mdev, FALSE);
99 spin_unlock_irqrestore(&mdev->req_lock, flags);
101 drbd_queue_work(&mdev->data.work, &e->w);
105 static int is_failed_barrier(int ee_flags)
107 return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
108 == (EE_IS_BARRIER|EE_WAS_ERROR);
111 /* writes on behalf of the partner, or resync writes,
112 * "submitted" by the receiver, final stage. */
113 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
115 unsigned long flags = 0;
116 struct drbd_conf *mdev = e->mdev;
120 int do_al_complete_io;
122 /* if this is a failed barrier request, disable use of barriers,
123 * and schedule for resubmission */
124 if (is_failed_barrier(e->flags)) {
125 drbd_bump_write_ordering(mdev, WO_bdev_flush);
126 spin_lock_irqsave(&mdev->req_lock, flags);
127 list_del(&e->w.list);
128 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
129 e->w.cb = w_e_reissue;
130 /* put_ldev actually happens below, once we come here again. */
132 spin_unlock_irqrestore(&mdev->req_lock, flags);
133 drbd_queue_work(&mdev->data.work, &e->w);
137 D_ASSERT(e->block_id != ID_VACANT);
139 /* after we moved e to done_ee,
140 * we may no longer access it,
141 * it may be freed/reused already!
142 * (as soon as we release the req_lock) */
143 e_sector = e->sector;
144 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
145 is_syncer_req = is_syncer_block_id(e->block_id);
147 spin_lock_irqsave(&mdev->req_lock, flags);
148 mdev->writ_cnt += e->size >> 9;
149 list_del(&e->w.list); /* has been on active_ee or sync_ee */
150 list_add_tail(&e->w.list, &mdev->done_ee);
152 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
153 * neither did we wake possibly waiting conflicting requests.
154 * done from "drbd_process_done_ee" within the appropriate w.cb
155 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
157 do_wake = is_syncer_req
158 ? list_empty(&mdev->sync_ee)
159 : list_empty(&mdev->active_ee);
161 if (test_bit(__EE_WAS_ERROR, &e->flags))
162 __drbd_chk_io_error(mdev, FALSE);
163 spin_unlock_irqrestore(&mdev->req_lock, flags);
166 drbd_rs_complete_io(mdev, e_sector);
169 wake_up(&mdev->ee_wait);
171 if (do_al_complete_io)
172 drbd_al_complete_io(mdev, e_sector);
178 /* writes on behalf of the partner, or resync writes,
179 * "submitted" by the receiver.
181 void drbd_endio_sec(struct bio *bio, int error)
183 struct drbd_epoch_entry *e = bio->bi_private;
184 struct drbd_conf *mdev = e->mdev;
185 int uptodate = bio_flagged(bio, BIO_UPTODATE);
186 int is_write = bio_data_dir(bio) == WRITE;
189 dev_warn(DEV, "%s: error=%d s=%llus\n",
190 is_write ? "write" : "read", error,
191 (unsigned long long)e->sector);
192 if (!error && !uptodate) {
193 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
194 is_write ? "write" : "read",
195 (unsigned long long)e->sector);
196 /* strange behavior of some lower level drivers...
197 * fail the request by clearing the uptodate flag,
198 * but do not return any error?! */
203 set_bit(__EE_WAS_ERROR, &e->flags);
205 bio_put(bio); /* no need for the bio anymore */
206 if (atomic_dec_and_test(&e->pending_bios)) {
208 drbd_endio_write_sec_final(e);
210 drbd_endio_read_sec_final(e);
214 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
216 void drbd_endio_pri(struct bio *bio, int error)
218 struct drbd_request *req = bio->bi_private;
219 struct drbd_conf *mdev = req->mdev;
220 enum drbd_req_event what;
221 int uptodate = bio_flagged(bio, BIO_UPTODATE);
223 if (!error && !uptodate) {
224 dev_warn(DEV, "p %s: setting error to -EIO\n",
225 bio_data_dir(bio) == WRITE ? "write" : "read");
226 /* strange behavior of some lower level drivers...
227 * fail the request by clearing the uptodate flag,
228 * but do not return any error?! */
232 /* to avoid recursion in __req_mod */
233 if (unlikely(error)) {
234 what = (bio_data_dir(bio) == WRITE)
235 ? write_completed_with_error
236 : (bio_rw(bio) == READ)
237 ? read_completed_with_error
238 : read_ahead_completed_with_error;
242 bio_put(req->private_bio);
243 req->private_bio = ERR_PTR(error);
248 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
250 struct drbd_request *req = container_of(w, struct drbd_request, w);
252 /* We should not detach for read io-error,
253 * but try to WRITE the P_DATA_REPLY to the failed location,
254 * to give the disk the chance to relocate that block */
256 spin_lock_irq(&mdev->req_lock);
257 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
258 _req_mod(req, read_retry_remote_canceled);
259 spin_unlock_irq(&mdev->req_lock);
262 spin_unlock_irq(&mdev->req_lock);
264 return w_send_read_req(mdev, w, 0);
267 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
269 ERR_IF(cancel) return 1;
270 dev_err(DEV, "resync inactive, but callback triggered??\n");
271 return 1; /* Simply ignore this! */
274 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
276 struct hash_desc desc;
277 struct scatterlist sg;
278 struct page *page = e->pages;
285 sg_init_table(&sg, 1);
286 crypto_hash_init(&desc);
288 while ((tmp = page_chain_next(page))) {
289 /* all but the last page will be fully used */
290 sg_set_page(&sg, page, PAGE_SIZE, 0);
291 crypto_hash_update(&desc, &sg, sg.length);
294 /* and now the last, possibly only partially used page */
295 len = e->size & (PAGE_SIZE - 1);
296 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
297 crypto_hash_update(&desc, &sg, sg.length);
298 crypto_hash_final(&desc, digest);
301 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
303 struct hash_desc desc;
304 struct scatterlist sg;
305 struct bio_vec *bvec;
311 sg_init_table(&sg, 1);
312 crypto_hash_init(&desc);
314 __bio_for_each_segment(bvec, bio, i, 0) {
315 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
316 crypto_hash_update(&desc, &sg, sg.length);
318 crypto_hash_final(&desc, digest);
321 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
323 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
328 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
330 if (unlikely(cancel)) {
331 drbd_free_ee(mdev, e);
335 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
336 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
337 digest = kmalloc(digest_size, GFP_NOIO);
339 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
341 inc_rs_pending(mdev);
342 ok = drbd_send_drequest_csum(mdev,
350 dev_err(DEV, "kmalloc() of digest failed.\n");
356 drbd_free_ee(mdev, e);
359 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
363 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
365 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
367 struct drbd_epoch_entry *e;
372 if (drbd_rs_should_slow_down(mdev))
375 /* GFP_TRY, because if there is no memory available right now, this may
376 * be rescheduled for later. It is "only" background resync, after all. */
377 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
381 e->w.cb = w_e_send_csum;
382 spin_lock_irq(&mdev->req_lock);
383 list_add(&e->w.list, &mdev->read_ee);
384 spin_unlock_irq(&mdev->req_lock);
386 atomic_add(size >> 9, &mdev->rs_sect_ev);
387 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
390 /* drbd_submit_ee currently fails for one reason only:
391 * not being able to allocate enough bios.
392 * Is dropping the connection going to help? */
393 spin_lock_irq(&mdev->req_lock);
394 list_del(&e->w.list);
395 spin_unlock_irq(&mdev->req_lock);
397 drbd_free_ee(mdev, e);
403 void resync_timer_fn(unsigned long data)
405 struct drbd_conf *mdev = (struct drbd_conf *) data;
409 switch (mdev->state.conn) {
411 mdev->resync_work.cb = w_make_ov_request;
414 mdev->resync_work.cb = w_make_resync_request;
418 mdev->resync_work.cb = w_resync_inactive;
421 /* harmless race: list_empty outside data.work.q_lock */
422 if (list_empty(&mdev->resync_work.list) && queue)
423 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
426 static void fifo_set(struct fifo_buffer *fb, int value)
430 for (i = 0; i < fb->size; i++)
431 fb->values[i] = value;
434 static int fifo_push(struct fifo_buffer *fb, int value)
438 ov = fb->values[fb->head_index];
439 fb->values[fb->head_index++] = value;
441 if (fb->head_index >= fb->size)
447 static void fifo_add_val(struct fifo_buffer *fb, int value)
451 for (i = 0; i < fb->size; i++)
452 fb->values[i] += value;
455 int drbd_rs_controller(struct drbd_conf *mdev)
457 unsigned int sect_in; /* Number of sectors that came in since the last turn */
458 unsigned int want; /* The number of sectors we want in the proxy */
459 int req_sect; /* Number of sectors to request in this turn */
460 int correction; /* Number of sectors more we need in the proxy*/
461 int cps; /* correction per invocation of drbd_rs_controller() */
462 int steps; /* Number of time steps to plan ahead */
466 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
467 mdev->rs_in_flight -= sect_in;
469 spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
471 steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
473 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
474 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
475 } else { /* normal path */
476 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
477 sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
480 correction = want - mdev->rs_in_flight - mdev->rs_planed;
483 cps = correction / steps;
484 fifo_add_val(&mdev->rs_plan_s, cps);
485 mdev->rs_planed += cps * steps;
487 /* What we do in this step */
488 curr_corr = fifo_push(&mdev->rs_plan_s, 0);
489 spin_unlock(&mdev->peer_seq_lock);
490 mdev->rs_planed -= curr_corr;
492 req_sect = sect_in + curr_corr;
496 max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
497 if (req_sect > max_sect)
501 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
502 sect_in, mdev->rs_in_flight, want, correction,
503 steps, cps, mdev->rs_planed, curr_corr, req_sect);
509 int w_make_resync_request(struct drbd_conf *mdev,
510 struct drbd_work *w, int cancel)
514 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
515 int max_segment_size;
516 int number, rollback_i, size, pe, mx;
517 int align, queued, sndbuf;
520 if (unlikely(cancel))
523 if (unlikely(mdev->state.conn < C_CONNECTED)) {
524 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
528 if (mdev->state.conn != C_SYNC_TARGET)
529 dev_err(DEV, "%s in w_make_resync_request\n",
530 drbd_conn_str(mdev->state.conn));
532 if (mdev->rs_total == 0) {
534 drbd_resync_finished(mdev);
538 if (!get_ldev(mdev)) {
539 /* Since we only need to access mdev->rsync a
540 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
541 to continue resync with a broken disk makes no sense at
543 dev_err(DEV, "Disk broke down during resync!\n");
544 mdev->resync_work.cb = w_resync_inactive;
548 /* starting with drbd 8.3.8, we can handle multi-bio EEs,
549 * if it should be necessary */
551 mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) :
552 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE;
554 if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
555 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
556 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
558 mdev->c_sync_rate = mdev->sync_conf.rate;
559 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
562 /* Throttle resync on lower level disk activity, which may also be
563 * caused by application IO on Primary/SyncTarget.
564 * Keep this after the call to drbd_rs_controller, as that assumes
565 * to be called as precisely as possible every SLEEP_TIME,
566 * and would be confused otherwise. */
567 if (drbd_rs_should_slow_down(mdev))
570 mutex_lock(&mdev->data.mutex);
571 if (mdev->data.socket)
572 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
575 mutex_unlock(&mdev->data.mutex);
577 /* For resync rates >160MB/sec, allow more pending RS requests */
581 /* Limit the number of pending RS requests to no more than the peer's receive buffer */
582 pe = atomic_read(&mdev->rs_pending_cnt);
583 if ((pe + number) > mx) {
587 for (i = 0; i < number; i++) {
588 /* Stop generating RS requests, when half of the send buffer is filled */
589 mutex_lock(&mdev->data.mutex);
590 if (mdev->data.socket) {
591 queued = mdev->data.socket->sk->sk_wmem_queued;
592 sndbuf = mdev->data.socket->sk->sk_sndbuf;
597 mutex_unlock(&mdev->data.mutex);
598 if (queued > sndbuf / 2)
602 size = BM_BLOCK_SIZE;
603 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
606 mdev->bm_resync_fo = drbd_bm_bits(mdev);
607 mdev->resync_work.cb = w_resync_inactive;
612 sector = BM_BIT_TO_SECT(bit);
614 if (drbd_try_rs_begin_io(mdev, sector)) {
615 mdev->bm_resync_fo = bit;
618 mdev->bm_resync_fo = bit + 1;
620 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
621 drbd_rs_complete_io(mdev, sector);
625 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
626 /* try to find some adjacent bits.
627 * we stop if we have already the maximum req size.
629 * Additionally always align bigger requests, in order to
630 * be prepared for all stripe sizes of software RAIDs.
635 if (size + BM_BLOCK_SIZE > max_segment_size)
638 /* Be always aligned */
639 if (sector & ((1<<(align+3))-1))
642 /* do not cross extent boundaries */
643 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
645 /* now, is it actually dirty, after all?
646 * caution, drbd_bm_test_bit is tri-state for some
647 * obscure reason; ( b == 0 ) would get the out-of-band
648 * only accidentally right because of the "oddly sized"
649 * adjustment below */
650 if (drbd_bm_test_bit(mdev, bit+1) != 1)
653 size += BM_BLOCK_SIZE;
654 if ((BM_BLOCK_SIZE << align) <= size)
658 /* if we merged some,
659 * reset the offset to start the next drbd_bm_find_next from */
660 if (size > BM_BLOCK_SIZE)
661 mdev->bm_resync_fo = bit + 1;
664 /* adjust very last sectors, in case we are oddly sized */
665 if (sector + (size>>9) > capacity)
666 size = (capacity-sector)<<9;
667 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
668 switch (read_for_csum(mdev, sector, size)) {
669 case -EIO: /* Disk failure */
672 case -EAGAIN: /* allocation failed, or ldev busy */
673 drbd_rs_complete_io(mdev, sector);
674 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
684 inc_rs_pending(mdev);
685 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
686 sector, size, ID_SYNCER)) {
687 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
688 dec_rs_pending(mdev);
695 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
696 /* last syncer _request_ was sent,
697 * but the P_RS_DATA_REPLY not yet received. sync will end (and
698 * next sync group will resume), as soon as we receive the last
699 * resync data block, and the last bit is cleared.
700 * until then resync "work" is "inactive" ...
702 mdev->resync_work.cb = w_resync_inactive;
708 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
709 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
714 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
718 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
720 if (unlikely(cancel))
723 if (unlikely(mdev->state.conn < C_CONNECTED)) {
724 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
728 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
729 if (atomic_read(&mdev->rs_pending_cnt) > number)
732 number -= atomic_read(&mdev->rs_pending_cnt);
734 sector = mdev->ov_position;
735 for (i = 0; i < number; i++) {
736 if (sector >= capacity) {
737 mdev->resync_work.cb = w_resync_inactive;
741 size = BM_BLOCK_SIZE;
743 if (drbd_try_rs_begin_io(mdev, sector)) {
744 mdev->ov_position = sector;
748 if (sector + (size>>9) > capacity)
749 size = (capacity-sector)<<9;
751 inc_rs_pending(mdev);
752 if (!drbd_send_ov_request(mdev, sector, size)) {
753 dec_rs_pending(mdev);
756 sector += BM_SECT_PER_BIT;
758 mdev->ov_position = sector;
761 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
766 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
770 drbd_resync_finished(mdev);
775 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
779 drbd_resync_finished(mdev);
784 static void ping_peer(struct drbd_conf *mdev)
786 clear_bit(GOT_PING_ACK, &mdev->flags);
788 wait_event(mdev->misc_wait,
789 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
792 int drbd_resync_finished(struct drbd_conf *mdev)
794 unsigned long db, dt, dbdt;
796 union drbd_state os, ns;
798 char *khelper_cmd = NULL;
800 /* Remove all elements from the resync LRU. Since future actions
801 * might set bits in the (main) bitmap, then the entries in the
802 * resync LRU would be wrong. */
803 if (drbd_rs_del_all(mdev)) {
804 /* In case this is not possible now, most probably because
805 * there are P_RS_DATA_REPLY Packets lingering on the worker's
806 * queue (or even the read operations for those packets
807 * is not finished by now). Retry in 100ms. */
810 __set_current_state(TASK_INTERRUPTIBLE);
811 schedule_timeout(HZ / 10);
812 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
814 w->cb = w_resync_finished;
815 drbd_queue_work(&mdev->data.work, w);
818 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
821 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
825 dbdt = Bit2KB(db/dt);
826 mdev->rs_paused /= HZ;
833 spin_lock_irq(&mdev->req_lock);
836 /* This protects us against multiple calls (that can happen in the presence
837 of application IO), and against connectivity loss just before we arrive here. */
838 if (os.conn <= C_CONNECTED)
842 ns.conn = C_CONNECTED;
844 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
845 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
846 "Online verify " : "Resync",
847 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
849 n_oos = drbd_bm_total_weight(mdev);
851 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
853 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
855 khelper_cmd = "out-of-sync";
858 D_ASSERT((n_oos - mdev->rs_failed) == 0);
860 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
861 khelper_cmd = "after-resync-target";
863 if (mdev->csums_tfm && mdev->rs_total) {
864 const unsigned long s = mdev->rs_same_csum;
865 const unsigned long t = mdev->rs_total;
868 (t < 100000) ? ((s*100)/t) : (s/(t/100));
869 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
870 "transferred %luK total %luK\n",
872 Bit2KB(mdev->rs_same_csum),
873 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
874 Bit2KB(mdev->rs_total));
878 if (mdev->rs_failed) {
879 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
881 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
882 ns.disk = D_INCONSISTENT;
883 ns.pdsk = D_UP_TO_DATE;
885 ns.disk = D_UP_TO_DATE;
886 ns.pdsk = D_INCONSISTENT;
889 ns.disk = D_UP_TO_DATE;
890 ns.pdsk = D_UP_TO_DATE;
892 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
895 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
896 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
897 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
898 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
900 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
904 drbd_uuid_set_bm(mdev, 0UL);
907 /* Now the two UUID sets are equal, update what we
908 * know of the peer. */
910 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
911 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
915 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
917 spin_unlock_irq(&mdev->req_lock);
923 mdev->ov_start_sector = 0;
925 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
926 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
927 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
931 drbd_khelper(mdev, khelper_cmd);
937 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
939 if (drbd_ee_has_active_page(e)) {
940 /* This might happen if sendpage() has not finished */
941 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
942 atomic_add(i, &mdev->pp_in_use_by_net);
943 atomic_sub(i, &mdev->pp_in_use);
944 spin_lock_irq(&mdev->req_lock);
945 list_add_tail(&e->w.list, &mdev->net_ee);
946 spin_unlock_irq(&mdev->req_lock);
947 wake_up(&drbd_pp_wait);
949 drbd_free_ee(mdev, e);
953 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
954 * @mdev: DRBD device.
956 * @cancel: The connection will be closed anyways
958 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
960 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
963 if (unlikely(cancel)) {
964 drbd_free_ee(mdev, e);
969 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
970 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
972 if (__ratelimit(&drbd_ratelimit_state))
973 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
974 (unsigned long long)e->sector);
976 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
981 move_to_net_ee_or_free(mdev, e);
984 dev_err(DEV, "drbd_send_block() failed\n");
989 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
990 * @mdev: DRBD device.
992 * @cancel: The connection will be closed anyways
994 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
996 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
999 if (unlikely(cancel)) {
1000 drbd_free_ee(mdev, e);
1005 if (get_ldev_if_state(mdev, D_FAILED)) {
1006 drbd_rs_complete_io(mdev, e->sector);
1010 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1011 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1012 inc_rs_pending(mdev);
1013 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1015 if (__ratelimit(&drbd_ratelimit_state))
1016 dev_err(DEV, "Not sending RSDataReply, "
1017 "partner DISKLESS!\n");
1021 if (__ratelimit(&drbd_ratelimit_state))
1022 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1023 (unsigned long long)e->sector);
1025 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1027 /* update resync data with failure */
1028 drbd_rs_failed_io(mdev, e->sector, e->size);
1033 move_to_net_ee_or_free(mdev, e);
1036 dev_err(DEV, "drbd_send_block() failed\n");
1040 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1042 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1043 struct digest_info *di;
1045 void *digest = NULL;
1048 if (unlikely(cancel)) {
1049 drbd_free_ee(mdev, e);
1054 if (get_ldev(mdev)) {
1055 drbd_rs_complete_io(mdev, e->sector);
1061 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1062 /* quick hack to try to avoid a race against reconfiguration.
1063 * a real fix would be much more involved,
1064 * introducing more locking mechanisms */
1065 if (mdev->csums_tfm) {
1066 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1067 D_ASSERT(digest_size == di->digest_size);
1068 digest = kmalloc(digest_size, GFP_NOIO);
1071 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1072 eq = !memcmp(digest, di->digest, digest_size);
1077 drbd_set_in_sync(mdev, e->sector, e->size);
1078 /* rs_same_csums unit is BM_BLOCK_SIZE */
1079 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1080 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1082 inc_rs_pending(mdev);
1083 e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1084 e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1086 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1089 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1090 if (__ratelimit(&drbd_ratelimit_state))
1091 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1095 move_to_net_ee_or_free(mdev, e);
1098 dev_err(DEV, "drbd_send_block/ack() failed\n");
1102 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1104 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1109 if (unlikely(cancel))
1112 if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1115 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1116 /* FIXME if this allocation fails, online verify will not terminate! */
1117 digest = kmalloc(digest_size, GFP_NOIO);
1119 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1120 inc_rs_pending(mdev);
1121 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1122 digest, digest_size, P_OV_REPLY);
1124 dec_rs_pending(mdev);
1129 drbd_free_ee(mdev, e);
1136 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1138 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1139 mdev->ov_last_oos_size += size>>9;
1141 mdev->ov_last_oos_start = sector;
1142 mdev->ov_last_oos_size = size>>9;
1144 drbd_set_out_of_sync(mdev, sector, size);
1145 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1148 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1150 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1151 struct digest_info *di;
1156 if (unlikely(cancel)) {
1157 drbd_free_ee(mdev, e);
1162 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1163 * the resync lru has been cleaned up already */
1164 if (get_ldev(mdev)) {
1165 drbd_rs_complete_io(mdev, e->sector);
1171 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1172 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1173 digest = kmalloc(digest_size, GFP_NOIO);
1175 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1177 D_ASSERT(digest_size == di->digest_size);
1178 eq = !memcmp(digest, di->digest, digest_size);
1182 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1183 if (__ratelimit(&drbd_ratelimit_state))
1184 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1189 drbd_ov_oos_found(mdev, e->sector, e->size);
1193 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1194 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1196 drbd_free_ee(mdev, e);
1198 if (--mdev->ov_left == 0) {
1200 drbd_resync_finished(mdev);
1206 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1208 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1213 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1215 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1216 struct p_barrier *p = &mdev->data.sbuf.barrier;
1219 /* really avoid racing with tl_clear. w.cb may have been referenced
1220 * just before it was reassigned and re-queued, so double check that.
1221 * actually, this race was harmless, since we only try to send the
1222 * barrier packet here, and otherwise do nothing with the object.
1223 * but compare with the head of w_clear_epoch */
1224 spin_lock_irq(&mdev->req_lock);
1225 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1227 spin_unlock_irq(&mdev->req_lock);
1231 if (!drbd_get_data_sock(mdev))
1233 p->barrier = b->br_number;
1234 /* inc_ap_pending was done where this was queued.
1235 * dec_ap_pending will be done in got_BarrierAck
1236 * or (on connection loss) in w_clear_epoch. */
1237 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1238 (struct p_header80 *)p, sizeof(*p), 0);
1239 drbd_put_data_sock(mdev);
1244 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1248 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1252 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1253 * @mdev: DRBD device.
1255 * @cancel: The connection will be closed anyways
1257 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1259 struct drbd_request *req = container_of(w, struct drbd_request, w);
1262 if (unlikely(cancel)) {
1263 req_mod(req, send_canceled);
1267 ok = drbd_send_dblock(mdev, req);
1268 req_mod(req, ok ? handed_over_to_network : send_failed);
1274 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1275 * @mdev: DRBD device.
1277 * @cancel: The connection will be closed anyways
1279 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1281 struct drbd_request *req = container_of(w, struct drbd_request, w);
1284 if (unlikely(cancel)) {
1285 req_mod(req, send_canceled);
1289 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1290 (unsigned long)req);
1293 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1294 * so this is probably redundant */
1295 if (mdev->state.conn >= C_CONNECTED)
1296 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1298 req_mod(req, ok ? handed_over_to_network : send_failed);
1303 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1305 struct drbd_request *req = container_of(w, struct drbd_request, w);
1307 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1308 drbd_al_begin_io(mdev, req->sector);
1309 /* Calling drbd_al_begin_io() out of the worker might deadlocks
1310 theoretically. Practically it can not deadlock, since this is
1311 only used when unfreezing IOs. All the extents of the requests
1312 that made it into the TL are already active */
1314 drbd_req_make_private_bio(req, req->master_bio);
1315 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1316 generic_make_request(req->private_bio);
1321 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1323 struct drbd_conf *odev = mdev;
1326 if (odev->sync_conf.after == -1)
1328 odev = minor_to_mdev(odev->sync_conf.after);
1329 ERR_IF(!odev) return 1;
1330 if ((odev->state.conn >= C_SYNC_SOURCE &&
1331 odev->state.conn <= C_PAUSED_SYNC_T) ||
1332 odev->state.aftr_isp || odev->state.peer_isp ||
1333 odev->state.user_isp)
1339 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1340 * @mdev: DRBD device.
1342 * Called from process context only (admin command and after_state_ch).
1344 static int _drbd_pause_after(struct drbd_conf *mdev)
1346 struct drbd_conf *odev;
1349 for (i = 0; i < minor_count; i++) {
1350 odev = minor_to_mdev(i);
1353 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1355 if (!_drbd_may_sync_now(odev))
1356 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1357 != SS_NOTHING_TO_DO);
1364 * _drbd_resume_next() - Resume resync on all devices that may resync now
1365 * @mdev: DRBD device.
1367 * Called from process context only (admin command and worker).
1369 static int _drbd_resume_next(struct drbd_conf *mdev)
1371 struct drbd_conf *odev;
1374 for (i = 0; i < minor_count; i++) {
1375 odev = minor_to_mdev(i);
1378 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1380 if (odev->state.aftr_isp) {
1381 if (_drbd_may_sync_now(odev))
1382 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1384 != SS_NOTHING_TO_DO) ;
1390 void resume_next_sg(struct drbd_conf *mdev)
1392 write_lock_irq(&global_state_lock);
1393 _drbd_resume_next(mdev);
1394 write_unlock_irq(&global_state_lock);
1397 void suspend_other_sg(struct drbd_conf *mdev)
1399 write_lock_irq(&global_state_lock);
1400 _drbd_pause_after(mdev);
1401 write_unlock_irq(&global_state_lock);
1404 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1406 struct drbd_conf *odev;
1410 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1411 return ERR_SYNC_AFTER;
1413 /* check for loops */
1414 odev = minor_to_mdev(o_minor);
1417 return ERR_SYNC_AFTER_CYCLE;
1419 /* dependency chain ends here, no cycles. */
1420 if (odev->sync_conf.after == -1)
1423 /* follow the dependency chain */
1424 odev = minor_to_mdev(odev->sync_conf.after);
1428 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1433 write_lock_irq(&global_state_lock);
1434 retcode = sync_after_error(mdev, na);
1435 if (retcode == NO_ERROR) {
1436 mdev->sync_conf.after = na;
1438 changes = _drbd_pause_after(mdev);
1439 changes |= _drbd_resume_next(mdev);
1442 write_unlock_irq(&global_state_lock);
1447 * drbd_start_resync() - Start the resync process
1448 * @mdev: DRBD device.
1449 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1451 * This function might bring you directly into one of the
1452 * C_PAUSED_SYNC_* states.
1454 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1456 union drbd_state ns;
1459 if (mdev->state.conn >= C_SYNC_SOURCE) {
1460 dev_err(DEV, "Resync already running!\n");
1464 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1465 drbd_rs_cancel_all(mdev);
1467 if (side == C_SYNC_TARGET) {
1468 /* Since application IO was locked out during C_WF_BITMAP_T and
1469 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1470 we check that we might make the data inconsistent. */
1471 r = drbd_khelper(mdev, "before-resync-target");
1472 r = (r >> 8) & 0xff;
1474 dev_info(DEV, "before-resync-target handler returned %d, "
1475 "dropping connection.\n", r);
1476 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1481 drbd_state_lock(mdev);
1483 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1484 drbd_state_unlock(mdev);
1488 if (side == C_SYNC_TARGET) {
1489 mdev->bm_resync_fo = 0;
1490 } else /* side == C_SYNC_SOURCE */ {
1493 get_random_bytes(&uuid, sizeof(u64));
1494 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1495 drbd_send_sync_uuid(mdev, uuid);
1497 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1500 write_lock_irq(&global_state_lock);
1503 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1507 if (side == C_SYNC_TARGET)
1508 ns.disk = D_INCONSISTENT;
1509 else /* side == C_SYNC_SOURCE */
1510 ns.pdsk = D_INCONSISTENT;
1512 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1515 if (ns.conn < C_CONNECTED)
1516 r = SS_UNKNOWN_ERROR;
1518 if (r == SS_SUCCESS) {
1519 unsigned long tw = drbd_bm_total_weight(mdev);
1520 unsigned long now = jiffies;
1523 mdev->rs_failed = 0;
1524 mdev->rs_paused = 0;
1525 mdev->rs_same_csum = 0;
1526 mdev->rs_last_events = 0;
1527 mdev->rs_last_sect_ev = 0;
1528 mdev->rs_total = tw;
1529 mdev->rs_start = now;
1530 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1531 mdev->rs_mark_left[i] = tw;
1532 mdev->rs_mark_time[i] = now;
1534 _drbd_pause_after(mdev);
1536 write_unlock_irq(&global_state_lock);
1539 if (r == SS_SUCCESS) {
1540 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1541 drbd_conn_str(ns.conn),
1542 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1543 (unsigned long) mdev->rs_total);
1545 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1546 /* This still has a race (about when exactly the peers
1547 * detect connection loss) that can lead to a full sync
1548 * on next handshake. In 8.3.9 we fixed this with explicit
1549 * resync-finished notifications, but the fix
1550 * introduces a protocol change. Sleeping for some
1551 * time longer than the ping interval + timeout on the
1552 * SyncSource, to give the SyncTarget the chance to
1553 * detect connection loss, then waiting for a ping
1554 * response (implicit in drbd_resync_finished) reduces
1555 * the race considerably, but does not solve it. */
1556 if (side == C_SYNC_SOURCE)
1557 schedule_timeout_interruptible(
1558 mdev->net_conf->ping_int * HZ +
1559 mdev->net_conf->ping_timeo*HZ/9);
1560 drbd_resync_finished(mdev);
1563 atomic_set(&mdev->rs_sect_in, 0);
1564 atomic_set(&mdev->rs_sect_ev, 0);
1565 mdev->rs_in_flight = 0;
1566 mdev->rs_planed = 0;
1567 spin_lock(&mdev->peer_seq_lock);
1568 fifo_set(&mdev->rs_plan_s, 0);
1569 spin_unlock(&mdev->peer_seq_lock);
1570 /* ns.conn may already be != mdev->state.conn,
1571 * we may have been paused in between, or become paused until
1572 * the timer triggers.
1573 * No matter, that is handled in resync_timer_fn() */
1574 if (ns.conn == C_SYNC_TARGET)
1575 mod_timer(&mdev->resync_timer, jiffies);
1579 drbd_state_unlock(mdev);
1582 int drbd_worker(struct drbd_thread *thi)
1584 struct drbd_conf *mdev = thi->mdev;
1585 struct drbd_work *w = NULL;
1586 LIST_HEAD(work_list);
1589 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1591 while (get_t_state(thi) == Running) {
1592 drbd_thread_current_set_cpu(mdev);
1594 if (down_trylock(&mdev->data.work.s)) {
1595 mutex_lock(&mdev->data.mutex);
1596 if (mdev->data.socket && !mdev->net_conf->no_cork)
1597 drbd_tcp_uncork(mdev->data.socket);
1598 mutex_unlock(&mdev->data.mutex);
1600 intr = down_interruptible(&mdev->data.work.s);
1602 mutex_lock(&mdev->data.mutex);
1603 if (mdev->data.socket && !mdev->net_conf->no_cork)
1604 drbd_tcp_cork(mdev->data.socket);
1605 mutex_unlock(&mdev->data.mutex);
1609 D_ASSERT(intr == -EINTR);
1610 flush_signals(current);
1611 ERR_IF (get_t_state(thi) == Running)
1616 if (get_t_state(thi) != Running)
1618 /* With this break, we have done a down() but not consumed
1619 the entry from the list. The cleanup code takes care of
1623 spin_lock_irq(&mdev->data.work.q_lock);
1624 ERR_IF(list_empty(&mdev->data.work.q)) {
1625 /* something terribly wrong in our logic.
1626 * we were able to down() the semaphore,
1627 * but the list is empty... doh.
1629 * what is the best thing to do now?
1630 * try again from scratch, restarting the receiver,
1631 * asender, whatnot? could break even more ugly,
1632 * e.g. when we are primary, but no good local data.
1634 * I'll try to get away just starting over this loop.
1636 spin_unlock_irq(&mdev->data.work.q_lock);
1639 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1640 list_del_init(&w->list);
1641 spin_unlock_irq(&mdev->data.work.q_lock);
1643 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1644 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1645 if (mdev->state.conn >= C_CONNECTED)
1646 drbd_force_state(mdev,
1647 NS(conn, C_NETWORK_FAILURE));
1650 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1651 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1653 spin_lock_irq(&mdev->data.work.q_lock);
1655 while (!list_empty(&mdev->data.work.q)) {
1656 list_splice_init(&mdev->data.work.q, &work_list);
1657 spin_unlock_irq(&mdev->data.work.q_lock);
1659 while (!list_empty(&work_list)) {
1660 w = list_entry(work_list.next, struct drbd_work, list);
1661 list_del_init(&w->list);
1663 i++; /* dead debugging code */
1666 spin_lock_irq(&mdev->data.work.q_lock);
1668 sema_init(&mdev->data.work.s, 0);
1669 /* DANGEROUS race: if someone did queue his work within the spinlock,
1670 * but up() ed outside the spinlock, we could get an up() on the
1671 * semaphore without corresponding list entry.
1674 spin_unlock_irq(&mdev->data.work.q_lock);
1676 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1677 /* _drbd_set_state only uses stop_nowait.
1678 * wait here for the Exiting receiver. */
1679 drbd_thread_stop(&mdev->receiver);
1680 drbd_mdev_cleanup(mdev);
1682 dev_info(DEV, "worker terminated\n");
1684 clear_bit(DEVICE_DYING, &mdev->flags);
1685 clear_bit(CONFIG_PENDING, &mdev->flags);
1686 wake_up(&mdev->state_wait);