4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
42 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
51 * more endio handlers:
52 atodb_endio in drbd_actlog.c
53 drbd_bm_async_io_complete in drbd_bitmap.c
55 * For all these callbacks, note the following:
56 * The callbacks will be called in irq context by the IDE drivers,
57 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58 * Try to get the locking right :)
63 /* About the global_state_lock
64 Each state transition on an device holds a read lock. In case we have
65 to evaluate the sync after dependencies, we grab a write lock, because
66 we need stable states on all devices for that. */
67 rwlock_t global_state_lock;
69 /* used for synchronous meta data and bitmap IO
70 * submitted by drbd_md_sync_page_io()
72 void drbd_md_io_complete(struct bio *bio, int error)
74 struct drbd_md_io *md_io;
76 md_io = (struct drbd_md_io *)bio->bi_private;
79 complete(&md_io->event);
82 /* reads on behalf of the partner,
83 * "submitted" by the receiver
85 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
87 unsigned long flags = 0;
88 struct drbd_conf *mdev = e->mdev;
90 D_ASSERT(e->block_id != ID_VACANT);
92 spin_lock_irqsave(&mdev->req_lock, flags);
93 mdev->read_cnt += e->size >> 9;
95 if (list_empty(&mdev->read_ee))
96 wake_up(&mdev->ee_wait);
97 if (test_bit(__EE_WAS_ERROR, &e->flags))
98 __drbd_chk_io_error(mdev, FALSE);
99 spin_unlock_irqrestore(&mdev->req_lock, flags);
101 drbd_queue_work(&mdev->data.work, &e->w);
105 static int is_failed_barrier(int ee_flags)
107 return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
108 == (EE_IS_BARRIER|EE_WAS_ERROR);
111 /* writes on behalf of the partner, or resync writes,
112 * "submitted" by the receiver, final stage. */
113 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
115 unsigned long flags = 0;
116 struct drbd_conf *mdev = e->mdev;
120 int do_al_complete_io;
122 /* if this is a failed barrier request, disable use of barriers,
123 * and schedule for resubmission */
124 if (is_failed_barrier(e->flags)) {
125 drbd_bump_write_ordering(mdev, WO_bdev_flush);
126 spin_lock_irqsave(&mdev->req_lock, flags);
127 list_del(&e->w.list);
128 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
129 e->w.cb = w_e_reissue;
130 /* put_ldev actually happens below, once we come here again. */
132 spin_unlock_irqrestore(&mdev->req_lock, flags);
133 drbd_queue_work(&mdev->data.work, &e->w);
137 D_ASSERT(e->block_id != ID_VACANT);
139 /* after we moved e to done_ee,
140 * we may no longer access it,
141 * it may be freed/reused already!
142 * (as soon as we release the req_lock) */
143 e_sector = e->sector;
144 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
145 is_syncer_req = is_syncer_block_id(e->block_id);
147 spin_lock_irqsave(&mdev->req_lock, flags);
148 mdev->writ_cnt += e->size >> 9;
149 list_del(&e->w.list); /* has been on active_ee or sync_ee */
150 list_add_tail(&e->w.list, &mdev->done_ee);
152 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
153 * neither did we wake possibly waiting conflicting requests.
154 * done from "drbd_process_done_ee" within the appropriate w.cb
155 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
157 do_wake = is_syncer_req
158 ? list_empty(&mdev->sync_ee)
159 : list_empty(&mdev->active_ee);
161 if (test_bit(__EE_WAS_ERROR, &e->flags))
162 __drbd_chk_io_error(mdev, FALSE);
163 spin_unlock_irqrestore(&mdev->req_lock, flags);
166 drbd_rs_complete_io(mdev, e_sector);
169 wake_up(&mdev->ee_wait);
171 if (do_al_complete_io)
172 drbd_al_complete_io(mdev, e_sector);
178 /* writes on behalf of the partner, or resync writes,
179 * "submitted" by the receiver.
181 void drbd_endio_sec(struct bio *bio, int error)
183 struct drbd_epoch_entry *e = bio->bi_private;
184 struct drbd_conf *mdev = e->mdev;
185 int uptodate = bio_flagged(bio, BIO_UPTODATE);
186 int is_write = bio_data_dir(bio) == WRITE;
189 dev_warn(DEV, "%s: error=%d s=%llus\n",
190 is_write ? "write" : "read", error,
191 (unsigned long long)e->sector);
192 if (!error && !uptodate) {
193 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
194 is_write ? "write" : "read",
195 (unsigned long long)e->sector);
196 /* strange behavior of some lower level drivers...
197 * fail the request by clearing the uptodate flag,
198 * but do not return any error?! */
203 set_bit(__EE_WAS_ERROR, &e->flags);
205 bio_put(bio); /* no need for the bio anymore */
206 if (atomic_dec_and_test(&e->pending_bios)) {
208 drbd_endio_write_sec_final(e);
210 drbd_endio_read_sec_final(e);
214 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
216 void drbd_endio_pri(struct bio *bio, int error)
218 struct drbd_request *req = bio->bi_private;
219 struct drbd_conf *mdev = req->mdev;
220 enum drbd_req_event what;
221 int uptodate = bio_flagged(bio, BIO_UPTODATE);
223 if (!error && !uptodate) {
224 dev_warn(DEV, "p %s: setting error to -EIO\n",
225 bio_data_dir(bio) == WRITE ? "write" : "read");
226 /* strange behavior of some lower level drivers...
227 * fail the request by clearing the uptodate flag,
228 * but do not return any error?! */
232 /* to avoid recursion in __req_mod */
233 if (unlikely(error)) {
234 what = (bio_data_dir(bio) == WRITE)
235 ? write_completed_with_error
236 : (bio_rw(bio) == READ)
237 ? read_completed_with_error
238 : read_ahead_completed_with_error;
242 bio_put(req->private_bio);
243 req->private_bio = ERR_PTR(error);
248 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
250 struct drbd_request *req = container_of(w, struct drbd_request, w);
252 /* We should not detach for read io-error,
253 * but try to WRITE the P_DATA_REPLY to the failed location,
254 * to give the disk the chance to relocate that block */
256 spin_lock_irq(&mdev->req_lock);
257 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
258 _req_mod(req, read_retry_remote_canceled);
259 spin_unlock_irq(&mdev->req_lock);
262 spin_unlock_irq(&mdev->req_lock);
264 return w_send_read_req(mdev, w, 0);
267 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
269 ERR_IF(cancel) return 1;
270 dev_err(DEV, "resync inactive, but callback triggered??\n");
271 return 1; /* Simply ignore this! */
274 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
276 struct hash_desc desc;
277 struct scatterlist sg;
278 struct page *page = e->pages;
285 sg_init_table(&sg, 1);
286 crypto_hash_init(&desc);
288 while ((tmp = page_chain_next(page))) {
289 /* all but the last page will be fully used */
290 sg_set_page(&sg, page, PAGE_SIZE, 0);
291 crypto_hash_update(&desc, &sg, sg.length);
294 /* and now the last, possibly only partially used page */
295 len = e->size & (PAGE_SIZE - 1);
296 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
297 crypto_hash_update(&desc, &sg, sg.length);
298 crypto_hash_final(&desc, digest);
301 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
303 struct hash_desc desc;
304 struct scatterlist sg;
305 struct bio_vec *bvec;
311 sg_init_table(&sg, 1);
312 crypto_hash_init(&desc);
314 __bio_for_each_segment(bvec, bio, i, 0) {
315 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
316 crypto_hash_update(&desc, &sg, sg.length);
318 crypto_hash_final(&desc, digest);
321 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
323 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
328 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
330 if (unlikely(cancel)) {
331 drbd_free_ee(mdev, e);
335 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
336 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
337 digest = kmalloc(digest_size, GFP_NOIO);
339 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
341 inc_rs_pending(mdev);
342 ok = drbd_send_drequest_csum(mdev,
350 dev_err(DEV, "kmalloc() of digest failed.\n");
356 drbd_free_ee(mdev, e);
359 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
363 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
365 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
367 struct drbd_epoch_entry *e;
372 if (drbd_rs_should_slow_down(mdev))
375 /* GFP_TRY, because if there is no memory available right now, this may
376 * be rescheduled for later. It is "only" background resync, after all. */
377 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
381 e->w.cb = w_e_send_csum;
382 spin_lock_irq(&mdev->req_lock);
383 list_add(&e->w.list, &mdev->read_ee);
384 spin_unlock_irq(&mdev->req_lock);
386 atomic_add(size >> 9, &mdev->rs_sect_ev);
387 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
390 /* drbd_submit_ee currently fails for one reason only:
391 * not being able to allocate enough bios.
392 * Is dropping the connection going to help? */
393 spin_lock_irq(&mdev->req_lock);
394 list_del(&e->w.list);
395 spin_unlock_irq(&mdev->req_lock);
397 drbd_free_ee(mdev, e);
403 void resync_timer_fn(unsigned long data)
405 struct drbd_conf *mdev = (struct drbd_conf *) data;
409 switch (mdev->state.conn) {
411 mdev->resync_work.cb = w_make_ov_request;
414 mdev->resync_work.cb = w_make_resync_request;
418 mdev->resync_work.cb = w_resync_inactive;
421 /* harmless race: list_empty outside data.work.q_lock */
422 if (list_empty(&mdev->resync_work.list) && queue)
423 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
426 static void fifo_set(struct fifo_buffer *fb, int value)
430 for (i = 0; i < fb->size; i++)
431 fb->values[i] = value;
434 static int fifo_push(struct fifo_buffer *fb, int value)
438 ov = fb->values[fb->head_index];
439 fb->values[fb->head_index++] = value;
441 if (fb->head_index >= fb->size)
447 static void fifo_add_val(struct fifo_buffer *fb, int value)
451 for (i = 0; i < fb->size; i++)
452 fb->values[i] += value;
455 int drbd_rs_controller(struct drbd_conf *mdev)
457 unsigned int sect_in; /* Number of sectors that came in since the last turn */
458 unsigned int want; /* The number of sectors we want in the proxy */
459 int req_sect; /* Number of sectors to request in this turn */
460 int correction; /* Number of sectors more we need in the proxy*/
461 int cps; /* correction per invocation of drbd_rs_controller() */
462 int steps; /* Number of time steps to plan ahead */
466 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
467 mdev->rs_in_flight -= sect_in;
469 spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
471 steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
473 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
474 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
475 } else { /* normal path */
476 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
477 sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
480 correction = want - mdev->rs_in_flight - mdev->rs_planed;
483 cps = correction / steps;
484 fifo_add_val(&mdev->rs_plan_s, cps);
485 mdev->rs_planed += cps * steps;
487 /* What we do in this step */
488 curr_corr = fifo_push(&mdev->rs_plan_s, 0);
489 spin_unlock(&mdev->peer_seq_lock);
490 mdev->rs_planed -= curr_corr;
492 req_sect = sect_in + curr_corr;
496 max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
497 if (req_sect > max_sect)
501 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
502 sect_in, mdev->rs_in_flight, want, correction,
503 steps, cps, mdev->rs_planed, curr_corr, req_sect);
509 int w_make_resync_request(struct drbd_conf *mdev,
510 struct drbd_work *w, int cancel)
514 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
515 int max_segment_size;
516 int number, rollback_i, size, pe, mx;
517 int align, queued, sndbuf;
520 if (unlikely(cancel))
523 if (unlikely(mdev->state.conn < C_CONNECTED)) {
524 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
528 if (mdev->state.conn != C_SYNC_TARGET)
529 dev_err(DEV, "%s in w_make_resync_request\n",
530 drbd_conn_str(mdev->state.conn));
532 if (mdev->rs_total == 0) {
534 drbd_resync_finished(mdev);
538 if (!get_ldev(mdev)) {
539 /* Since we only need to access mdev->rsync a
540 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
541 to continue resync with a broken disk makes no sense at
543 dev_err(DEV, "Disk broke down during resync!\n");
544 mdev->resync_work.cb = w_resync_inactive;
548 /* starting with drbd 8.3.8, we can handle multi-bio EEs,
549 * if it should be necessary */
551 mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) :
552 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE;
554 if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
555 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
556 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
558 mdev->c_sync_rate = mdev->sync_conf.rate;
559 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
562 /* Throttle resync on lower level disk activity, which may also be
563 * caused by application IO on Primary/SyncTarget.
564 * Keep this after the call to drbd_rs_controller, as that assumes
565 * to be called as precisely as possible every SLEEP_TIME,
566 * and would be confused otherwise. */
567 if (drbd_rs_should_slow_down(mdev))
570 mutex_lock(&mdev->data.mutex);
571 if (mdev->data.socket)
572 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
575 mutex_unlock(&mdev->data.mutex);
577 /* For resync rates >160MB/sec, allow more pending RS requests */
581 /* Limit the number of pending RS requests to no more than the peer's receive buffer */
582 pe = atomic_read(&mdev->rs_pending_cnt);
583 if ((pe + number) > mx) {
587 for (i = 0; i < number; i++) {
588 /* Stop generating RS requests, when half of the send buffer is filled */
589 mutex_lock(&mdev->data.mutex);
590 if (mdev->data.socket) {
591 queued = mdev->data.socket->sk->sk_wmem_queued;
592 sndbuf = mdev->data.socket->sk->sk_sndbuf;
597 mutex_unlock(&mdev->data.mutex);
598 if (queued > sndbuf / 2)
602 size = BM_BLOCK_SIZE;
603 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
606 mdev->bm_resync_fo = drbd_bm_bits(mdev);
607 mdev->resync_work.cb = w_resync_inactive;
612 sector = BM_BIT_TO_SECT(bit);
614 if (drbd_try_rs_begin_io(mdev, sector)) {
615 mdev->bm_resync_fo = bit;
618 mdev->bm_resync_fo = bit + 1;
620 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
621 drbd_rs_complete_io(mdev, sector);
625 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
626 /* try to find some adjacent bits.
627 * we stop if we have already the maximum req size.
629 * Additionally always align bigger requests, in order to
630 * be prepared for all stripe sizes of software RAIDs.
635 if (size + BM_BLOCK_SIZE > max_segment_size)
638 /* Be always aligned */
639 if (sector & ((1<<(align+3))-1))
642 /* do not cross extent boundaries */
643 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
645 /* now, is it actually dirty, after all?
646 * caution, drbd_bm_test_bit is tri-state for some
647 * obscure reason; ( b == 0 ) would get the out-of-band
648 * only accidentally right because of the "oddly sized"
649 * adjustment below */
650 if (drbd_bm_test_bit(mdev, bit+1) != 1)
653 size += BM_BLOCK_SIZE;
654 if ((BM_BLOCK_SIZE << align) <= size)
658 /* if we merged some,
659 * reset the offset to start the next drbd_bm_find_next from */
660 if (size > BM_BLOCK_SIZE)
661 mdev->bm_resync_fo = bit + 1;
664 /* adjust very last sectors, in case we are oddly sized */
665 if (sector + (size>>9) > capacity)
666 size = (capacity-sector)<<9;
667 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
668 switch (read_for_csum(mdev, sector, size)) {
669 case -EIO: /* Disk failure */
672 case -EAGAIN: /* allocation failed, or ldev busy */
673 drbd_rs_complete_io(mdev, sector);
674 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
684 inc_rs_pending(mdev);
685 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
686 sector, size, ID_SYNCER)) {
687 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
688 dec_rs_pending(mdev);
695 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
696 /* last syncer _request_ was sent,
697 * but the P_RS_DATA_REPLY not yet received. sync will end (and
698 * next sync group will resume), as soon as we receive the last
699 * resync data block, and the last bit is cleared.
700 * until then resync "work" is "inactive" ...
702 mdev->resync_work.cb = w_resync_inactive;
708 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
709 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
714 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
718 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
720 if (unlikely(cancel))
723 if (unlikely(mdev->state.conn < C_CONNECTED)) {
724 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
728 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
729 if (atomic_read(&mdev->rs_pending_cnt) > number)
732 number -= atomic_read(&mdev->rs_pending_cnt);
734 sector = mdev->ov_position;
735 for (i = 0; i < number; i++) {
736 if (sector >= capacity) {
737 mdev->resync_work.cb = w_resync_inactive;
741 size = BM_BLOCK_SIZE;
743 if (drbd_try_rs_begin_io(mdev, sector)) {
744 mdev->ov_position = sector;
748 if (sector + (size>>9) > capacity)
749 size = (capacity-sector)<<9;
751 inc_rs_pending(mdev);
752 if (!drbd_send_ov_request(mdev, sector, size)) {
753 dec_rs_pending(mdev);
756 sector += BM_SECT_PER_BIT;
758 mdev->ov_position = sector;
761 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
766 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
770 drbd_resync_finished(mdev);
775 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
779 drbd_resync_finished(mdev);
784 static void ping_peer(struct drbd_conf *mdev)
786 clear_bit(GOT_PING_ACK, &mdev->flags);
788 wait_event(mdev->misc_wait,
789 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
792 int drbd_resync_finished(struct drbd_conf *mdev)
794 unsigned long db, dt, dbdt;
796 union drbd_state os, ns;
798 char *khelper_cmd = NULL;
800 /* Remove all elements from the resync LRU. Since future actions
801 * might set bits in the (main) bitmap, then the entries in the
802 * resync LRU would be wrong. */
803 if (drbd_rs_del_all(mdev)) {
804 /* In case this is not possible now, most probably because
805 * there are P_RS_DATA_REPLY Packets lingering on the worker's
806 * queue (or even the read operations for those packets
807 * is not finished by now). Retry in 100ms. */
810 __set_current_state(TASK_INTERRUPTIBLE);
811 schedule_timeout(HZ / 10);
812 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
814 w->cb = w_resync_finished;
815 drbd_queue_work(&mdev->data.work, w);
818 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
821 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
825 dbdt = Bit2KB(db/dt);
826 mdev->rs_paused /= HZ;
833 spin_lock_irq(&mdev->req_lock);
836 /* This protects us against multiple calls (that can happen in the presence
837 of application IO), and against connectivity loss just before we arrive here. */
838 if (os.conn <= C_CONNECTED)
842 ns.conn = C_CONNECTED;
844 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
845 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
846 "Online verify " : "Resync",
847 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
849 n_oos = drbd_bm_total_weight(mdev);
851 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
853 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
855 khelper_cmd = "out-of-sync";
858 D_ASSERT((n_oos - mdev->rs_failed) == 0);
860 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
861 khelper_cmd = "after-resync-target";
863 if (mdev->csums_tfm && mdev->rs_total) {
864 const unsigned long s = mdev->rs_same_csum;
865 const unsigned long t = mdev->rs_total;
868 (t < 100000) ? ((s*100)/t) : (s/(t/100));
869 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
870 "transferred %luK total %luK\n",
872 Bit2KB(mdev->rs_same_csum),
873 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
874 Bit2KB(mdev->rs_total));
878 if (mdev->rs_failed) {
879 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
881 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
882 ns.disk = D_INCONSISTENT;
883 ns.pdsk = D_UP_TO_DATE;
885 ns.disk = D_UP_TO_DATE;
886 ns.pdsk = D_INCONSISTENT;
889 ns.disk = D_UP_TO_DATE;
890 ns.pdsk = D_UP_TO_DATE;
892 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
895 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
896 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
897 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
898 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
900 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
904 drbd_uuid_set_bm(mdev, 0UL);
907 /* Now the two UUID sets are equal, update what we
908 * know of the peer. */
910 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
911 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
915 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
917 spin_unlock_irq(&mdev->req_lock);
923 mdev->ov_start_sector = 0;
927 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
928 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
929 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
933 drbd_khelper(mdev, khelper_cmd);
939 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
941 if (drbd_ee_has_active_page(e)) {
942 /* This might happen if sendpage() has not finished */
943 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
944 atomic_add(i, &mdev->pp_in_use_by_net);
945 atomic_sub(i, &mdev->pp_in_use);
946 spin_lock_irq(&mdev->req_lock);
947 list_add_tail(&e->w.list, &mdev->net_ee);
948 spin_unlock_irq(&mdev->req_lock);
949 wake_up(&drbd_pp_wait);
951 drbd_free_ee(mdev, e);
955 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
956 * @mdev: DRBD device.
958 * @cancel: The connection will be closed anyways
960 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
962 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
965 if (unlikely(cancel)) {
966 drbd_free_ee(mdev, e);
971 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
972 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
974 if (__ratelimit(&drbd_ratelimit_state))
975 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
976 (unsigned long long)e->sector);
978 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
983 move_to_net_ee_or_free(mdev, e);
986 dev_err(DEV, "drbd_send_block() failed\n");
991 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
992 * @mdev: DRBD device.
994 * @cancel: The connection will be closed anyways
996 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
998 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1001 if (unlikely(cancel)) {
1002 drbd_free_ee(mdev, e);
1007 if (get_ldev_if_state(mdev, D_FAILED)) {
1008 drbd_rs_complete_io(mdev, e->sector);
1012 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1013 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1014 inc_rs_pending(mdev);
1015 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1017 if (__ratelimit(&drbd_ratelimit_state))
1018 dev_err(DEV, "Not sending RSDataReply, "
1019 "partner DISKLESS!\n");
1023 if (__ratelimit(&drbd_ratelimit_state))
1024 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1025 (unsigned long long)e->sector);
1027 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1029 /* update resync data with failure */
1030 drbd_rs_failed_io(mdev, e->sector, e->size);
1035 move_to_net_ee_or_free(mdev, e);
1038 dev_err(DEV, "drbd_send_block() failed\n");
1042 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1044 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1045 struct digest_info *di;
1047 void *digest = NULL;
1050 if (unlikely(cancel)) {
1051 drbd_free_ee(mdev, e);
1056 if (get_ldev(mdev)) {
1057 drbd_rs_complete_io(mdev, e->sector);
1063 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1064 /* quick hack to try to avoid a race against reconfiguration.
1065 * a real fix would be much more involved,
1066 * introducing more locking mechanisms */
1067 if (mdev->csums_tfm) {
1068 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1069 D_ASSERT(digest_size == di->digest_size);
1070 digest = kmalloc(digest_size, GFP_NOIO);
1073 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1074 eq = !memcmp(digest, di->digest, digest_size);
1079 drbd_set_in_sync(mdev, e->sector, e->size);
1080 /* rs_same_csums unit is BM_BLOCK_SIZE */
1081 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1082 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1084 inc_rs_pending(mdev);
1085 e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1086 e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1088 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1091 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1092 if (__ratelimit(&drbd_ratelimit_state))
1093 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1097 move_to_net_ee_or_free(mdev, e);
1100 dev_err(DEV, "drbd_send_block/ack() failed\n");
1104 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1106 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1111 if (unlikely(cancel))
1114 if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1117 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1118 /* FIXME if this allocation fails, online verify will not terminate! */
1119 digest = kmalloc(digest_size, GFP_NOIO);
1121 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1122 inc_rs_pending(mdev);
1123 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1124 digest, digest_size, P_OV_REPLY);
1126 dec_rs_pending(mdev);
1131 drbd_free_ee(mdev, e);
1138 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1140 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1141 mdev->ov_last_oos_size += size>>9;
1143 mdev->ov_last_oos_start = sector;
1144 mdev->ov_last_oos_size = size>>9;
1146 drbd_set_out_of_sync(mdev, sector, size);
1147 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1150 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1152 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1153 struct digest_info *di;
1158 if (unlikely(cancel)) {
1159 drbd_free_ee(mdev, e);
1164 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1165 * the resync lru has been cleaned up already */
1166 if (get_ldev(mdev)) {
1167 drbd_rs_complete_io(mdev, e->sector);
1173 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1174 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1175 digest = kmalloc(digest_size, GFP_NOIO);
1177 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1179 D_ASSERT(digest_size == di->digest_size);
1180 eq = !memcmp(digest, di->digest, digest_size);
1184 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1185 if (__ratelimit(&drbd_ratelimit_state))
1186 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1191 drbd_ov_oos_found(mdev, e->sector, e->size);
1195 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1196 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1198 drbd_free_ee(mdev, e);
1200 if (--mdev->ov_left == 0) {
1202 drbd_resync_finished(mdev);
1208 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1210 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1215 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1217 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1218 struct p_barrier *p = &mdev->data.sbuf.barrier;
1221 /* really avoid racing with tl_clear. w.cb may have been referenced
1222 * just before it was reassigned and re-queued, so double check that.
1223 * actually, this race was harmless, since we only try to send the
1224 * barrier packet here, and otherwise do nothing with the object.
1225 * but compare with the head of w_clear_epoch */
1226 spin_lock_irq(&mdev->req_lock);
1227 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1229 spin_unlock_irq(&mdev->req_lock);
1233 if (!drbd_get_data_sock(mdev))
1235 p->barrier = b->br_number;
1236 /* inc_ap_pending was done where this was queued.
1237 * dec_ap_pending will be done in got_BarrierAck
1238 * or (on connection loss) in w_clear_epoch. */
1239 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1240 (struct p_header80 *)p, sizeof(*p), 0);
1241 drbd_put_data_sock(mdev);
1246 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1250 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1254 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1255 * @mdev: DRBD device.
1257 * @cancel: The connection will be closed anyways
1259 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1261 struct drbd_request *req = container_of(w, struct drbd_request, w);
1264 if (unlikely(cancel)) {
1265 req_mod(req, send_canceled);
1269 ok = drbd_send_dblock(mdev, req);
1270 req_mod(req, ok ? handed_over_to_network : send_failed);
1276 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1277 * @mdev: DRBD device.
1279 * @cancel: The connection will be closed anyways
1281 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1283 struct drbd_request *req = container_of(w, struct drbd_request, w);
1286 if (unlikely(cancel)) {
1287 req_mod(req, send_canceled);
1291 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1292 (unsigned long)req);
1295 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1296 * so this is probably redundant */
1297 if (mdev->state.conn >= C_CONNECTED)
1298 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1300 req_mod(req, ok ? handed_over_to_network : send_failed);
1305 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1307 struct drbd_request *req = container_of(w, struct drbd_request, w);
1309 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1310 drbd_al_begin_io(mdev, req->sector);
1311 /* Calling drbd_al_begin_io() out of the worker might deadlocks
1312 theoretically. Practically it can not deadlock, since this is
1313 only used when unfreezing IOs. All the extents of the requests
1314 that made it into the TL are already active */
1316 drbd_req_make_private_bio(req, req->master_bio);
1317 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1318 generic_make_request(req->private_bio);
1323 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1325 struct drbd_conf *odev = mdev;
1328 if (odev->sync_conf.after == -1)
1330 odev = minor_to_mdev(odev->sync_conf.after);
1331 ERR_IF(!odev) return 1;
1332 if ((odev->state.conn >= C_SYNC_SOURCE &&
1333 odev->state.conn <= C_PAUSED_SYNC_T) ||
1334 odev->state.aftr_isp || odev->state.peer_isp ||
1335 odev->state.user_isp)
1341 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1342 * @mdev: DRBD device.
1344 * Called from process context only (admin command and after_state_ch).
1346 static int _drbd_pause_after(struct drbd_conf *mdev)
1348 struct drbd_conf *odev;
1351 for (i = 0; i < minor_count; i++) {
1352 odev = minor_to_mdev(i);
1355 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1357 if (!_drbd_may_sync_now(odev))
1358 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1359 != SS_NOTHING_TO_DO);
1366 * _drbd_resume_next() - Resume resync on all devices that may resync now
1367 * @mdev: DRBD device.
1369 * Called from process context only (admin command and worker).
1371 static int _drbd_resume_next(struct drbd_conf *mdev)
1373 struct drbd_conf *odev;
1376 for (i = 0; i < minor_count; i++) {
1377 odev = minor_to_mdev(i);
1380 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1382 if (odev->state.aftr_isp) {
1383 if (_drbd_may_sync_now(odev))
1384 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1386 != SS_NOTHING_TO_DO) ;
1392 void resume_next_sg(struct drbd_conf *mdev)
1394 write_lock_irq(&global_state_lock);
1395 _drbd_resume_next(mdev);
1396 write_unlock_irq(&global_state_lock);
1399 void suspend_other_sg(struct drbd_conf *mdev)
1401 write_lock_irq(&global_state_lock);
1402 _drbd_pause_after(mdev);
1403 write_unlock_irq(&global_state_lock);
1406 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1408 struct drbd_conf *odev;
1412 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1413 return ERR_SYNC_AFTER;
1415 /* check for loops */
1416 odev = minor_to_mdev(o_minor);
1419 return ERR_SYNC_AFTER_CYCLE;
1421 /* dependency chain ends here, no cycles. */
1422 if (odev->sync_conf.after == -1)
1425 /* follow the dependency chain */
1426 odev = minor_to_mdev(odev->sync_conf.after);
1430 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1435 write_lock_irq(&global_state_lock);
1436 retcode = sync_after_error(mdev, na);
1437 if (retcode == NO_ERROR) {
1438 mdev->sync_conf.after = na;
1440 changes = _drbd_pause_after(mdev);
1441 changes |= _drbd_resume_next(mdev);
1444 write_unlock_irq(&global_state_lock);
1449 * drbd_start_resync() - Start the resync process
1450 * @mdev: DRBD device.
1451 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1453 * This function might bring you directly into one of the
1454 * C_PAUSED_SYNC_* states.
1456 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1458 union drbd_state ns;
1461 if (mdev->state.conn >= C_SYNC_SOURCE) {
1462 dev_err(DEV, "Resync already running!\n");
1466 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1467 drbd_rs_cancel_all(mdev);
1469 if (side == C_SYNC_TARGET) {
1470 /* Since application IO was locked out during C_WF_BITMAP_T and
1471 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1472 we check that we might make the data inconsistent. */
1473 r = drbd_khelper(mdev, "before-resync-target");
1474 r = (r >> 8) & 0xff;
1476 dev_info(DEV, "before-resync-target handler returned %d, "
1477 "dropping connection.\n", r);
1478 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1483 drbd_state_lock(mdev);
1485 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1486 drbd_state_unlock(mdev);
1490 if (side == C_SYNC_TARGET) {
1491 mdev->bm_resync_fo = 0;
1492 } else /* side == C_SYNC_SOURCE */ {
1495 get_random_bytes(&uuid, sizeof(u64));
1496 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1497 drbd_send_sync_uuid(mdev, uuid);
1499 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1502 write_lock_irq(&global_state_lock);
1505 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1509 if (side == C_SYNC_TARGET)
1510 ns.disk = D_INCONSISTENT;
1511 else /* side == C_SYNC_SOURCE */
1512 ns.pdsk = D_INCONSISTENT;
1514 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1517 if (ns.conn < C_CONNECTED)
1518 r = SS_UNKNOWN_ERROR;
1520 if (r == SS_SUCCESS) {
1521 unsigned long tw = drbd_bm_total_weight(mdev);
1522 unsigned long now = jiffies;
1525 mdev->rs_failed = 0;
1526 mdev->rs_paused = 0;
1527 mdev->rs_same_csum = 0;
1528 mdev->rs_last_events = 0;
1529 mdev->rs_last_sect_ev = 0;
1530 mdev->rs_total = tw;
1531 mdev->rs_start = now;
1532 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1533 mdev->rs_mark_left[i] = tw;
1534 mdev->rs_mark_time[i] = now;
1536 _drbd_pause_after(mdev);
1538 write_unlock_irq(&global_state_lock);
1541 if (r == SS_SUCCESS) {
1542 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1543 drbd_conn_str(ns.conn),
1544 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1545 (unsigned long) mdev->rs_total);
1547 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1548 /* This still has a race (about when exactly the peers
1549 * detect connection loss) that can lead to a full sync
1550 * on next handshake. In 8.3.9 we fixed this with explicit
1551 * resync-finished notifications, but the fix
1552 * introduces a protocol change. Sleeping for some
1553 * time longer than the ping interval + timeout on the
1554 * SyncSource, to give the SyncTarget the chance to
1555 * detect connection loss, then waiting for a ping
1556 * response (implicit in drbd_resync_finished) reduces
1557 * the race considerably, but does not solve it. */
1558 if (side == C_SYNC_SOURCE)
1559 schedule_timeout_interruptible(
1560 mdev->net_conf->ping_int * HZ +
1561 mdev->net_conf->ping_timeo*HZ/9);
1562 drbd_resync_finished(mdev);
1565 atomic_set(&mdev->rs_sect_in, 0);
1566 atomic_set(&mdev->rs_sect_ev, 0);
1567 mdev->rs_in_flight = 0;
1568 mdev->rs_planed = 0;
1569 spin_lock(&mdev->peer_seq_lock);
1570 fifo_set(&mdev->rs_plan_s, 0);
1571 spin_unlock(&mdev->peer_seq_lock);
1572 /* ns.conn may already be != mdev->state.conn,
1573 * we may have been paused in between, or become paused until
1574 * the timer triggers.
1575 * No matter, that is handled in resync_timer_fn() */
1576 if (ns.conn == C_SYNC_TARGET)
1577 mod_timer(&mdev->resync_timer, jiffies);
1581 drbd_state_unlock(mdev);
1584 int drbd_worker(struct drbd_thread *thi)
1586 struct drbd_conf *mdev = thi->mdev;
1587 struct drbd_work *w = NULL;
1588 LIST_HEAD(work_list);
1591 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1593 while (get_t_state(thi) == Running) {
1594 drbd_thread_current_set_cpu(mdev);
1596 if (down_trylock(&mdev->data.work.s)) {
1597 mutex_lock(&mdev->data.mutex);
1598 if (mdev->data.socket && !mdev->net_conf->no_cork)
1599 drbd_tcp_uncork(mdev->data.socket);
1600 mutex_unlock(&mdev->data.mutex);
1602 intr = down_interruptible(&mdev->data.work.s);
1604 mutex_lock(&mdev->data.mutex);
1605 if (mdev->data.socket && !mdev->net_conf->no_cork)
1606 drbd_tcp_cork(mdev->data.socket);
1607 mutex_unlock(&mdev->data.mutex);
1611 D_ASSERT(intr == -EINTR);
1612 flush_signals(current);
1613 ERR_IF (get_t_state(thi) == Running)
1618 if (get_t_state(thi) != Running)
1620 /* With this break, we have done a down() but not consumed
1621 the entry from the list. The cleanup code takes care of
1625 spin_lock_irq(&mdev->data.work.q_lock);
1626 ERR_IF(list_empty(&mdev->data.work.q)) {
1627 /* something terribly wrong in our logic.
1628 * we were able to down() the semaphore,
1629 * but the list is empty... doh.
1631 * what is the best thing to do now?
1632 * try again from scratch, restarting the receiver,
1633 * asender, whatnot? could break even more ugly,
1634 * e.g. when we are primary, but no good local data.
1636 * I'll try to get away just starting over this loop.
1638 spin_unlock_irq(&mdev->data.work.q_lock);
1641 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1642 list_del_init(&w->list);
1643 spin_unlock_irq(&mdev->data.work.q_lock);
1645 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1646 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1647 if (mdev->state.conn >= C_CONNECTED)
1648 drbd_force_state(mdev,
1649 NS(conn, C_NETWORK_FAILURE));
1652 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1653 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1655 spin_lock_irq(&mdev->data.work.q_lock);
1657 while (!list_empty(&mdev->data.work.q)) {
1658 list_splice_init(&mdev->data.work.q, &work_list);
1659 spin_unlock_irq(&mdev->data.work.q_lock);
1661 while (!list_empty(&work_list)) {
1662 w = list_entry(work_list.next, struct drbd_work, list);
1663 list_del_init(&w->list);
1665 i++; /* dead debugging code */
1668 spin_lock_irq(&mdev->data.work.q_lock);
1670 sema_init(&mdev->data.work.s, 0);
1671 /* DANGEROUS race: if someone did queue his work within the spinlock,
1672 * but up() ed outside the spinlock, we could get an up() on the
1673 * semaphore without corresponding list entry.
1676 spin_unlock_irq(&mdev->data.work.q_lock);
1678 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1679 /* _drbd_set_state only uses stop_nowait.
1680 * wait here for the Exiting receiver. */
1681 drbd_thread_stop(&mdev->receiver);
1682 drbd_mdev_cleanup(mdev);
1684 dev_info(DEV, "worker terminated\n");
1686 clear_bit(DEVICE_DYING, &mdev->flags);
1687 clear_bit(CONFIG_PENDING, &mdev->flags);
1688 wake_up(&mdev->state_wait);