]> git.karo-electronics.de Git - mv-sheeva.git/blob - drivers/block/drbd/drbd_worker.c
ed8796f1112d933004a6feea740765d3251cce65
[mv-sheeva.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/version.h>
28 #include <linux/drbd.h>
29 #include <linux/sched.h>
30 #include <linux/smp_lock.h>
31 #include <linux/wait.h>
32 #include <linux/mm.h>
33 #include <linux/memcontrol.h>
34 #include <linux/mm_inline.h>
35 #include <linux/slab.h>
36 #include <linux/random.h>
37 #include <linux/mm.h>
38 #include <linux/string.h>
39 #include <linux/scatterlist.h>
40
41 #include "drbd_int.h"
42 #include "drbd_req.h"
43
44 #define SLEEP_TIME (HZ/10)
45
46 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
47
48
49
50 /* defined here:
51    drbd_md_io_complete
52    drbd_endio_write_sec
53    drbd_endio_read_sec
54    drbd_endio_pri
55
56  * more endio handlers:
57    atodb_endio in drbd_actlog.c
58    drbd_bm_async_io_complete in drbd_bitmap.c
59
60  * For all these callbacks, note the following:
61  * The callbacks will be called in irq context by the IDE drivers,
62  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
63  * Try to get the locking right :)
64  *
65  */
66
67
68 /* About the global_state_lock
69    Each state transition on an device holds a read lock. In case we have
70    to evaluate the sync after dependencies, we grab a write lock, because
71    we need stable states on all devices for that.  */
72 rwlock_t global_state_lock;
73
74 /* used for synchronous meta data and bitmap IO
75  * submitted by drbd_md_sync_page_io()
76  */
77 void drbd_md_io_complete(struct bio *bio, int error)
78 {
79         struct drbd_md_io *md_io;
80
81         md_io = (struct drbd_md_io *)bio->bi_private;
82         md_io->error = error;
83
84         complete(&md_io->event);
85 }
86
87 /* reads on behalf of the partner,
88  * "submitted" by the receiver
89  */
90 void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
91 {
92         unsigned long flags = 0;
93         struct drbd_epoch_entry *e = NULL;
94         struct drbd_conf *mdev;
95         int uptodate = bio_flagged(bio, BIO_UPTODATE);
96
97         e = bio->bi_private;
98         mdev = e->mdev;
99
100         if (error)
101                 dev_warn(DEV, "read: error=%d s=%llus\n", error,
102                                 (unsigned long long)e->sector);
103         if (!error && !uptodate) {
104                 dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
105                                 (unsigned long long)e->sector);
106                 /* strange behavior of some lower level drivers...
107                  * fail the request by clearing the uptodate flag,
108                  * but do not return any error?! */
109                 error = -EIO;
110         }
111
112         D_ASSERT(e->block_id != ID_VACANT);
113
114         spin_lock_irqsave(&mdev->req_lock, flags);
115         mdev->read_cnt += e->size >> 9;
116         list_del(&e->w.list);
117         if (list_empty(&mdev->read_ee))
118                 wake_up(&mdev->ee_wait);
119         spin_unlock_irqrestore(&mdev->req_lock, flags);
120
121         drbd_chk_io_error(mdev, error, FALSE);
122         drbd_queue_work(&mdev->data.work, &e->w);
123         put_ldev(mdev);
124 }
125
126 /* writes on behalf of the partner, or resync writes,
127  * "submitted" by the receiver.
128  */
129 void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
130 {
131         unsigned long flags = 0;
132         struct drbd_epoch_entry *e = NULL;
133         struct drbd_conf *mdev;
134         sector_t e_sector;
135         int do_wake;
136         int is_syncer_req;
137         int do_al_complete_io;
138         int uptodate = bio_flagged(bio, BIO_UPTODATE);
139         int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
140
141         e = bio->bi_private;
142         mdev = e->mdev;
143
144         if (error)
145                 dev_warn(DEV, "write: error=%d s=%llus\n", error,
146                                 (unsigned long long)e->sector);
147         if (!error && !uptodate) {
148                 dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
149                                 (unsigned long long)e->sector);
150                 /* strange behavior of some lower level drivers...
151                  * fail the request by clearing the uptodate flag,
152                  * but do not return any error?! */
153                 error = -EIO;
154         }
155
156         /* error == -ENOTSUPP would be a better test,
157          * alas it is not reliable */
158         if (error && is_barrier && e->flags & EE_IS_BARRIER) {
159                 drbd_bump_write_ordering(mdev, WO_bdev_flush);
160                 spin_lock_irqsave(&mdev->req_lock, flags);
161                 list_del(&e->w.list);
162                 e->w.cb = w_e_reissue;
163                 /* put_ldev actually happens below, once we come here again. */
164                 __release(local);
165                 spin_unlock_irqrestore(&mdev->req_lock, flags);
166                 drbd_queue_work(&mdev->data.work, &e->w);
167                 return;
168         }
169
170         D_ASSERT(e->block_id != ID_VACANT);
171
172         spin_lock_irqsave(&mdev->req_lock, flags);
173         mdev->writ_cnt += e->size >> 9;
174         is_syncer_req = is_syncer_block_id(e->block_id);
175
176         /* after we moved e to done_ee,
177          * we may no longer access it,
178          * it may be freed/reused already!
179          * (as soon as we release the req_lock) */
180         e_sector = e->sector;
181         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
182
183         list_del(&e->w.list); /* has been on active_ee or sync_ee */
184         list_add_tail(&e->w.list, &mdev->done_ee);
185
186         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
187          * neither did we wake possibly waiting conflicting requests.
188          * done from "drbd_process_done_ee" within the appropriate w.cb
189          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
190
191         do_wake = is_syncer_req
192                 ? list_empty(&mdev->sync_ee)
193                 : list_empty(&mdev->active_ee);
194
195         if (error)
196                 __drbd_chk_io_error(mdev, FALSE);
197         spin_unlock_irqrestore(&mdev->req_lock, flags);
198
199         if (is_syncer_req)
200                 drbd_rs_complete_io(mdev, e_sector);
201
202         if (do_wake)
203                 wake_up(&mdev->ee_wait);
204
205         if (do_al_complete_io)
206                 drbd_al_complete_io(mdev, e_sector);
207
208         wake_asender(mdev);
209         put_ldev(mdev);
210
211 }
212
213 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
214  */
215 void drbd_endio_pri(struct bio *bio, int error)
216 {
217         unsigned long flags;
218         struct drbd_request *req = bio->bi_private;
219         struct drbd_conf *mdev = req->mdev;
220         struct bio_and_error m;
221         enum drbd_req_event what;
222         int uptodate = bio_flagged(bio, BIO_UPTODATE);
223
224         if (error)
225                 dev_warn(DEV, "p %s: error=%d\n",
226                          bio_data_dir(bio) == WRITE ? "write" : "read", error);
227         if (!error && !uptodate) {
228                 dev_warn(DEV, "p %s: setting error to -EIO\n",
229                          bio_data_dir(bio) == WRITE ? "write" : "read");
230                 /* strange behavior of some lower level drivers...
231                  * fail the request by clearing the uptodate flag,
232                  * but do not return any error?! */
233                 error = -EIO;
234         }
235
236         /* to avoid recursion in __req_mod */
237         if (unlikely(error)) {
238                 what = (bio_data_dir(bio) == WRITE)
239                         ? write_completed_with_error
240                         : (bio_rw(bio) == READA)
241                           ? read_completed_with_error
242                           : read_ahead_completed_with_error;
243         } else
244                 what = completed_ok;
245
246         bio_put(req->private_bio);
247         req->private_bio = ERR_PTR(error);
248
249         spin_lock_irqsave(&mdev->req_lock, flags);
250         __req_mod(req, what, &m);
251         spin_unlock_irqrestore(&mdev->req_lock, flags);
252
253         if (m.bio)
254                 complete_master_bio(mdev, &m);
255 }
256
257 int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
258 {
259         struct drbd_request *req = container_of(w, struct drbd_request, w);
260
261         /* NOTE: mdev->ldev can be NULL by the time we get here! */
262         /* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */
263
264         /* the only way this callback is scheduled is from _req_may_be_done,
265          * when it is done and had a local write error, see comments there */
266         drbd_req_free(req);
267
268         return TRUE;
269 }
270
271 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
272 {
273         struct drbd_request *req = container_of(w, struct drbd_request, w);
274
275         /* We should not detach for read io-error,
276          * but try to WRITE the P_DATA_REPLY to the failed location,
277          * to give the disk the chance to relocate that block */
278
279         spin_lock_irq(&mdev->req_lock);
280         if (cancel ||
281             mdev->state.conn < C_CONNECTED ||
282             mdev->state.pdsk <= D_INCONSISTENT) {
283                 _req_mod(req, send_canceled);
284                 spin_unlock_irq(&mdev->req_lock);
285                 dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
286                 return 1;
287         }
288         spin_unlock_irq(&mdev->req_lock);
289
290         return w_send_read_req(mdev, w, 0);
291 }
292
293 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
294 {
295         ERR_IF(cancel) return 1;
296         dev_err(DEV, "resync inactive, but callback triggered??\n");
297         return 1; /* Simply ignore this! */
298 }
299
300 void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
301 {
302         struct hash_desc desc;
303         struct scatterlist sg;
304         struct bio_vec *bvec;
305         int i;
306
307         desc.tfm = tfm;
308         desc.flags = 0;
309
310         sg_init_table(&sg, 1);
311         crypto_hash_init(&desc);
312
313         __bio_for_each_segment(bvec, bio, i, 0) {
314                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
315                 crypto_hash_update(&desc, &sg, sg.length);
316         }
317         crypto_hash_final(&desc, digest);
318 }
319
320 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
321 {
322         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
323         int digest_size;
324         void *digest;
325         int ok;
326
327         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
328
329         if (unlikely(cancel)) {
330                 drbd_free_ee(mdev, e);
331                 return 1;
332         }
333
334         if (likely(drbd_bio_uptodate(e->private_bio))) {
335                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
336                 digest = kmalloc(digest_size, GFP_NOIO);
337                 if (digest) {
338                         drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
339
340                         inc_rs_pending(mdev);
341                         ok = drbd_send_drequest_csum(mdev,
342                                                      e->sector,
343                                                      e->size,
344                                                      digest,
345                                                      digest_size,
346                                                      P_CSUM_RS_REQUEST);
347                         kfree(digest);
348                 } else {
349                         dev_err(DEV, "kmalloc() of digest failed.\n");
350                         ok = 0;
351                 }
352         } else
353                 ok = 1;
354
355         drbd_free_ee(mdev, e);
356
357         if (unlikely(!ok))
358                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
359         return ok;
360 }
361
362 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
363
364 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
365 {
366         struct drbd_epoch_entry *e;
367
368         if (!get_ldev(mdev))
369                 return 0;
370
371         /* GFP_TRY, because if there is no memory available right now, this may
372          * be rescheduled for later. It is "only" background resync, after all. */
373         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
374         if (!e) {
375                 put_ldev(mdev);
376                 return 2;
377         }
378
379         spin_lock_irq(&mdev->req_lock);
380         list_add(&e->w.list, &mdev->read_ee);
381         spin_unlock_irq(&mdev->req_lock);
382
383         e->private_bio->bi_end_io = drbd_endio_read_sec;
384         e->private_bio->bi_rw = READ;
385         e->w.cb = w_e_send_csum;
386
387         mdev->read_cnt += size >> 9;
388         drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
389
390         return 1;
391 }
392
393 void resync_timer_fn(unsigned long data)
394 {
395         unsigned long flags;
396         struct drbd_conf *mdev = (struct drbd_conf *) data;
397         int queue;
398
399         spin_lock_irqsave(&mdev->req_lock, flags);
400
401         if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
402                 queue = 1;
403                 if (mdev->state.conn == C_VERIFY_S)
404                         mdev->resync_work.cb = w_make_ov_request;
405                 else
406                         mdev->resync_work.cb = w_make_resync_request;
407         } else {
408                 queue = 0;
409                 mdev->resync_work.cb = w_resync_inactive;
410         }
411
412         spin_unlock_irqrestore(&mdev->req_lock, flags);
413
414         /* harmless race: list_empty outside data.work.q_lock */
415         if (list_empty(&mdev->resync_work.list) && queue)
416                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
417 }
418
419 int w_make_resync_request(struct drbd_conf *mdev,
420                 struct drbd_work *w, int cancel)
421 {
422         unsigned long bit;
423         sector_t sector;
424         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
425         int max_segment_size = queue_max_segment_size(mdev->rq_queue);
426         int number, i, size, pe, mx;
427         int align, queued, sndbuf;
428
429         if (unlikely(cancel))
430                 return 1;
431
432         if (unlikely(mdev->state.conn < C_CONNECTED)) {
433                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
434                 return 0;
435         }
436
437         if (mdev->state.conn != C_SYNC_TARGET)
438                 dev_err(DEV, "%s in w_make_resync_request\n",
439                         drbd_conn_str(mdev->state.conn));
440
441         if (!get_ldev(mdev)) {
442                 /* Since we only need to access mdev->rsync a
443                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
444                    to continue resync with a broken disk makes no sense at
445                    all */
446                 dev_err(DEV, "Disk broke down during resync!\n");
447                 mdev->resync_work.cb = w_resync_inactive;
448                 return 1;
449         }
450
451         number = SLEEP_TIME * mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
452         pe = atomic_read(&mdev->rs_pending_cnt);
453
454         mutex_lock(&mdev->data.mutex);
455         if (mdev->data.socket)
456                 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
457         else
458                 mx = 1;
459         mutex_unlock(&mdev->data.mutex);
460
461         /* For resync rates >160MB/sec, allow more pending RS requests */
462         if (number > mx)
463                 mx = number;
464
465         /* Limit the number of pending RS requests to no more than the peer's receive buffer */
466         if ((pe + number) > mx) {
467                 number = mx - pe;
468         }
469
470         for (i = 0; i < number; i++) {
471                 /* Stop generating RS requests, when half of the send buffer is filled */
472                 mutex_lock(&mdev->data.mutex);
473                 if (mdev->data.socket) {
474                         queued = mdev->data.socket->sk->sk_wmem_queued;
475                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
476                 } else {
477                         queued = 1;
478                         sndbuf = 0;
479                 }
480                 mutex_unlock(&mdev->data.mutex);
481                 if (queued > sndbuf / 2)
482                         goto requeue;
483
484 next_sector:
485                 size = BM_BLOCK_SIZE;
486                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
487
488                 if (bit == -1UL) {
489                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
490                         mdev->resync_work.cb = w_resync_inactive;
491                         put_ldev(mdev);
492                         return 1;
493                 }
494
495                 sector = BM_BIT_TO_SECT(bit);
496
497                 if (drbd_try_rs_begin_io(mdev, sector)) {
498                         mdev->bm_resync_fo = bit;
499                         goto requeue;
500                 }
501                 mdev->bm_resync_fo = bit + 1;
502
503                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
504                         drbd_rs_complete_io(mdev, sector);
505                         goto next_sector;
506                 }
507
508 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
509                 /* try to find some adjacent bits.
510                  * we stop if we have already the maximum req size.
511                  *
512                  * Additionally always align bigger requests, in order to
513                  * be prepared for all stripe sizes of software RAIDs.
514                  *
515                  * we _do_ care about the agreed-upon q->max_segment_size
516                  * here, as splitting up the requests on the other side is more
517                  * difficult.  the consequence is, that on lvm and md and other
518                  * "indirect" devices, this is dead code, since
519                  * q->max_segment_size will be PAGE_SIZE.
520                  */
521                 align = 1;
522                 for (;;) {
523                         if (size + BM_BLOCK_SIZE > max_segment_size)
524                                 break;
525
526                         /* Be always aligned */
527                         if (sector & ((1<<(align+3))-1))
528                                 break;
529
530                         /* do not cross extent boundaries */
531                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
532                                 break;
533                         /* now, is it actually dirty, after all?
534                          * caution, drbd_bm_test_bit is tri-state for some
535                          * obscure reason; ( b == 0 ) would get the out-of-band
536                          * only accidentally right because of the "oddly sized"
537                          * adjustment below */
538                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
539                                 break;
540                         bit++;
541                         size += BM_BLOCK_SIZE;
542                         if ((BM_BLOCK_SIZE << align) <= size)
543                                 align++;
544                         i++;
545                 }
546                 /* if we merged some,
547                  * reset the offset to start the next drbd_bm_find_next from */
548                 if (size > BM_BLOCK_SIZE)
549                         mdev->bm_resync_fo = bit + 1;
550 #endif
551
552                 /* adjust very last sectors, in case we are oddly sized */
553                 if (sector + (size>>9) > capacity)
554                         size = (capacity-sector)<<9;
555                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
556                         switch (read_for_csum(mdev, sector, size)) {
557                         case 0: /* Disk failure*/
558                                 put_ldev(mdev);
559                                 return 0;
560                         case 2: /* Allocation failed */
561                                 drbd_rs_complete_io(mdev, sector);
562                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
563                                 goto requeue;
564                         /* case 1: everything ok */
565                         }
566                 } else {
567                         inc_rs_pending(mdev);
568                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
569                                                sector, size, ID_SYNCER)) {
570                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
571                                 dec_rs_pending(mdev);
572                                 put_ldev(mdev);
573                                 return 0;
574                         }
575                 }
576         }
577
578         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
579                 /* last syncer _request_ was sent,
580                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
581                  * next sync group will resume), as soon as we receive the last
582                  * resync data block, and the last bit is cleared.
583                  * until then resync "work" is "inactive" ...
584                  */
585                 mdev->resync_work.cb = w_resync_inactive;
586                 put_ldev(mdev);
587                 return 1;
588         }
589
590  requeue:
591         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
592         put_ldev(mdev);
593         return 1;
594 }
595
596 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
597 {
598         int number, i, size;
599         sector_t sector;
600         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
601
602         if (unlikely(cancel))
603                 return 1;
604
605         if (unlikely(mdev->state.conn < C_CONNECTED)) {
606                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
607                 return 0;
608         }
609
610         number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
611         if (atomic_read(&mdev->rs_pending_cnt) > number)
612                 goto requeue;
613
614         number -= atomic_read(&mdev->rs_pending_cnt);
615
616         sector = mdev->ov_position;
617         for (i = 0; i < number; i++) {
618                 if (sector >= capacity) {
619                         mdev->resync_work.cb = w_resync_inactive;
620                         return 1;
621                 }
622
623                 size = BM_BLOCK_SIZE;
624
625                 if (drbd_try_rs_begin_io(mdev, sector)) {
626                         mdev->ov_position = sector;
627                         goto requeue;
628                 }
629
630                 if (sector + (size>>9) > capacity)
631                         size = (capacity-sector)<<9;
632
633                 inc_rs_pending(mdev);
634                 if (!drbd_send_ov_request(mdev, sector, size)) {
635                         dec_rs_pending(mdev);
636                         return 0;
637                 }
638                 sector += BM_SECT_PER_BIT;
639         }
640         mdev->ov_position = sector;
641
642  requeue:
643         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
644         return 1;
645 }
646
647
648 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
649 {
650         kfree(w);
651         ov_oos_print(mdev);
652         drbd_resync_finished(mdev);
653
654         return 1;
655 }
656
657 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
658 {
659         kfree(w);
660
661         drbd_resync_finished(mdev);
662
663         return 1;
664 }
665
666 int drbd_resync_finished(struct drbd_conf *mdev)
667 {
668         unsigned long db, dt, dbdt;
669         unsigned long n_oos;
670         union drbd_state os, ns;
671         struct drbd_work *w;
672         char *khelper_cmd = NULL;
673
674         /* Remove all elements from the resync LRU. Since future actions
675          * might set bits in the (main) bitmap, then the entries in the
676          * resync LRU would be wrong. */
677         if (drbd_rs_del_all(mdev)) {
678                 /* In case this is not possible now, most probably because
679                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
680                  * queue (or even the read operations for those packets
681                  * is not finished by now).   Retry in 100ms. */
682
683                 drbd_kick_lo(mdev);
684                 __set_current_state(TASK_INTERRUPTIBLE);
685                 schedule_timeout(HZ / 10);
686                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
687                 if (w) {
688                         w->cb = w_resync_finished;
689                         drbd_queue_work(&mdev->data.work, w);
690                         return 1;
691                 }
692                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
693         }
694
695         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
696         if (dt <= 0)
697                 dt = 1;
698         db = mdev->rs_total;
699         dbdt = Bit2KB(db/dt);
700         mdev->rs_paused /= HZ;
701
702         if (!get_ldev(mdev))
703                 goto out;
704
705         spin_lock_irq(&mdev->req_lock);
706         os = mdev->state;
707
708         /* This protects us against multiple calls (that can happen in the presence
709            of application IO), and against connectivity loss just before we arrive here. */
710         if (os.conn <= C_CONNECTED)
711                 goto out_unlock;
712
713         ns = os;
714         ns.conn = C_CONNECTED;
715
716         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
717              (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
718              "Online verify " : "Resync",
719              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
720
721         n_oos = drbd_bm_total_weight(mdev);
722
723         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
724                 if (n_oos) {
725                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
726                               n_oos, Bit2KB(1));
727                         khelper_cmd = "out-of-sync";
728                 }
729         } else {
730                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
731
732                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
733                         khelper_cmd = "after-resync-target";
734
735                 if (mdev->csums_tfm && mdev->rs_total) {
736                         const unsigned long s = mdev->rs_same_csum;
737                         const unsigned long t = mdev->rs_total;
738                         const int ratio =
739                                 (t == 0)     ? 0 :
740                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
741                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
742                              "transferred %luK total %luK\n",
743                              ratio,
744                              Bit2KB(mdev->rs_same_csum),
745                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
746                              Bit2KB(mdev->rs_total));
747                 }
748         }
749
750         if (mdev->rs_failed) {
751                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
752
753                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
754                         ns.disk = D_INCONSISTENT;
755                         ns.pdsk = D_UP_TO_DATE;
756                 } else {
757                         ns.disk = D_UP_TO_DATE;
758                         ns.pdsk = D_INCONSISTENT;
759                 }
760         } else {
761                 ns.disk = D_UP_TO_DATE;
762                 ns.pdsk = D_UP_TO_DATE;
763
764                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
765                         if (mdev->p_uuid) {
766                                 int i;
767                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
768                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
769                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
770                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
771                         } else {
772                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
773                         }
774                 }
775
776                 drbd_uuid_set_bm(mdev, 0UL);
777
778                 if (mdev->p_uuid) {
779                         /* Now the two UUID sets are equal, update what we
780                          * know of the peer. */
781                         int i;
782                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
783                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
784                 }
785         }
786
787         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
788 out_unlock:
789         spin_unlock_irq(&mdev->req_lock);
790         put_ldev(mdev);
791 out:
792         mdev->rs_total  = 0;
793         mdev->rs_failed = 0;
794         mdev->rs_paused = 0;
795         mdev->ov_start_sector = 0;
796
797         if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
798                 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
799                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
800         }
801
802         if (khelper_cmd)
803                 drbd_khelper(mdev, khelper_cmd);
804
805         return 1;
806 }
807
808 /* helper */
809 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
810 {
811         if (drbd_bio_has_active_page(e->private_bio)) {
812                 /* This might happen if sendpage() has not finished */
813                 spin_lock_irq(&mdev->req_lock);
814                 list_add_tail(&e->w.list, &mdev->net_ee);
815                 spin_unlock_irq(&mdev->req_lock);
816         } else
817                 drbd_free_ee(mdev, e);
818 }
819
820 /**
821  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
822  * @mdev:       DRBD device.
823  * @w:          work object.
824  * @cancel:     The connection will be closed anyways
825  */
826 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
827 {
828         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
829         int ok;
830
831         if (unlikely(cancel)) {
832                 drbd_free_ee(mdev, e);
833                 dec_unacked(mdev);
834                 return 1;
835         }
836
837         if (likely(drbd_bio_uptodate(e->private_bio))) {
838                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
839         } else {
840                 if (__ratelimit(&drbd_ratelimit_state))
841                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
842                             (unsigned long long)e->sector);
843
844                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
845         }
846
847         dec_unacked(mdev);
848
849         move_to_net_ee_or_free(mdev, e);
850
851         if (unlikely(!ok))
852                 dev_err(DEV, "drbd_send_block() failed\n");
853         return ok;
854 }
855
856 /**
857  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
858  * @mdev:       DRBD device.
859  * @w:          work object.
860  * @cancel:     The connection will be closed anyways
861  */
862 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
863 {
864         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
865         int ok;
866
867         if (unlikely(cancel)) {
868                 drbd_free_ee(mdev, e);
869                 dec_unacked(mdev);
870                 return 1;
871         }
872
873         if (get_ldev_if_state(mdev, D_FAILED)) {
874                 drbd_rs_complete_io(mdev, e->sector);
875                 put_ldev(mdev);
876         }
877
878         if (likely(drbd_bio_uptodate(e->private_bio))) {
879                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
880                         inc_rs_pending(mdev);
881                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
882                 } else {
883                         if (__ratelimit(&drbd_ratelimit_state))
884                                 dev_err(DEV, "Not sending RSDataReply, "
885                                     "partner DISKLESS!\n");
886                         ok = 1;
887                 }
888         } else {
889                 if (__ratelimit(&drbd_ratelimit_state))
890                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
891                             (unsigned long long)e->sector);
892
893                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
894
895                 /* update resync data with failure */
896                 drbd_rs_failed_io(mdev, e->sector, e->size);
897         }
898
899         dec_unacked(mdev);
900
901         move_to_net_ee_or_free(mdev, e);
902
903         if (unlikely(!ok))
904                 dev_err(DEV, "drbd_send_block() failed\n");
905         return ok;
906 }
907
908 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
909 {
910         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
911         struct digest_info *di;
912         int digest_size;
913         void *digest = NULL;
914         int ok, eq = 0;
915
916         if (unlikely(cancel)) {
917                 drbd_free_ee(mdev, e);
918                 dec_unacked(mdev);
919                 return 1;
920         }
921
922         drbd_rs_complete_io(mdev, e->sector);
923
924         di = (struct digest_info *)(unsigned long)e->block_id;
925
926         if (likely(drbd_bio_uptodate(e->private_bio))) {
927                 /* quick hack to try to avoid a race against reconfiguration.
928                  * a real fix would be much more involved,
929                  * introducing more locking mechanisms */
930                 if (mdev->csums_tfm) {
931                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
932                         D_ASSERT(digest_size == di->digest_size);
933                         digest = kmalloc(digest_size, GFP_NOIO);
934                 }
935                 if (digest) {
936                         drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
937                         eq = !memcmp(digest, di->digest, digest_size);
938                         kfree(digest);
939                 }
940
941                 if (eq) {
942                         drbd_set_in_sync(mdev, e->sector, e->size);
943                         mdev->rs_same_csum++;
944                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
945                 } else {
946                         inc_rs_pending(mdev);
947                         e->block_id = ID_SYNCER;
948                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
949                 }
950         } else {
951                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
952                 if (__ratelimit(&drbd_ratelimit_state))
953                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
954         }
955
956         dec_unacked(mdev);
957
958         kfree(di);
959
960         move_to_net_ee_or_free(mdev, e);
961
962         if (unlikely(!ok))
963                 dev_err(DEV, "drbd_send_block/ack() failed\n");
964         return ok;
965 }
966
967 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
968 {
969         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
970         int digest_size;
971         void *digest;
972         int ok = 1;
973
974         if (unlikely(cancel))
975                 goto out;
976
977         if (unlikely(!drbd_bio_uptodate(e->private_bio)))
978                 goto out;
979
980         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
981         /* FIXME if this allocation fails, online verify will not terminate! */
982         digest = kmalloc(digest_size, GFP_NOIO);
983         if (digest) {
984                 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
985                 inc_rs_pending(mdev);
986                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
987                                              digest, digest_size, P_OV_REPLY);
988                 if (!ok)
989                         dec_rs_pending(mdev);
990                 kfree(digest);
991         }
992
993 out:
994         drbd_free_ee(mdev, e);
995
996         dec_unacked(mdev);
997
998         return ok;
999 }
1000
1001 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1002 {
1003         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1004                 mdev->ov_last_oos_size += size>>9;
1005         } else {
1006                 mdev->ov_last_oos_start = sector;
1007                 mdev->ov_last_oos_size = size>>9;
1008         }
1009         drbd_set_out_of_sync(mdev, sector, size);
1010         set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1011 }
1012
1013 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1014 {
1015         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1016         struct digest_info *di;
1017         int digest_size;
1018         void *digest;
1019         int ok, eq = 0;
1020
1021         if (unlikely(cancel)) {
1022                 drbd_free_ee(mdev, e);
1023                 dec_unacked(mdev);
1024                 return 1;
1025         }
1026
1027         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1028          * the resync lru has been cleaned up already */
1029         drbd_rs_complete_io(mdev, e->sector);
1030
1031         di = (struct digest_info *)(unsigned long)e->block_id;
1032
1033         if (likely(drbd_bio_uptodate(e->private_bio))) {
1034                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1035                 digest = kmalloc(digest_size, GFP_NOIO);
1036                 if (digest) {
1037                         drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
1038
1039                         D_ASSERT(digest_size == di->digest_size);
1040                         eq = !memcmp(digest, di->digest, digest_size);
1041                         kfree(digest);
1042                 }
1043         } else {
1044                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1045                 if (__ratelimit(&drbd_ratelimit_state))
1046                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1047         }
1048
1049         dec_unacked(mdev);
1050
1051         kfree(di);
1052
1053         if (!eq)
1054                 drbd_ov_oos_found(mdev, e->sector, e->size);
1055         else
1056                 ov_oos_print(mdev);
1057
1058         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1059                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1060
1061         drbd_free_ee(mdev, e);
1062
1063         if (--mdev->ov_left == 0) {
1064                 ov_oos_print(mdev);
1065                 drbd_resync_finished(mdev);
1066         }
1067
1068         return ok;
1069 }
1070
1071 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1072 {
1073         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1074         complete(&b->done);
1075         return 1;
1076 }
1077
1078 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1079 {
1080         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1081         struct p_barrier *p = &mdev->data.sbuf.barrier;
1082         int ok = 1;
1083
1084         /* really avoid racing with tl_clear.  w.cb may have been referenced
1085          * just before it was reassigned and re-queued, so double check that.
1086          * actually, this race was harmless, since we only try to send the
1087          * barrier packet here, and otherwise do nothing with the object.
1088          * but compare with the head of w_clear_epoch */
1089         spin_lock_irq(&mdev->req_lock);
1090         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1091                 cancel = 1;
1092         spin_unlock_irq(&mdev->req_lock);
1093         if (cancel)
1094                 return 1;
1095
1096         if (!drbd_get_data_sock(mdev))
1097                 return 0;
1098         p->barrier = b->br_number;
1099         /* inc_ap_pending was done where this was queued.
1100          * dec_ap_pending will be done in got_BarrierAck
1101          * or (on connection loss) in w_clear_epoch.  */
1102         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1103                                 (struct p_header *)p, sizeof(*p), 0);
1104         drbd_put_data_sock(mdev);
1105
1106         return ok;
1107 }
1108
1109 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1110 {
1111         if (cancel)
1112                 return 1;
1113         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1114 }
1115
1116 /**
1117  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1118  * @mdev:       DRBD device.
1119  * @w:          work object.
1120  * @cancel:     The connection will be closed anyways
1121  */
1122 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1123 {
1124         struct drbd_request *req = container_of(w, struct drbd_request, w);
1125         int ok;
1126
1127         if (unlikely(cancel)) {
1128                 req_mod(req, send_canceled);
1129                 return 1;
1130         }
1131
1132         ok = drbd_send_dblock(mdev, req);
1133         req_mod(req, ok ? handed_over_to_network : send_failed);
1134
1135         return ok;
1136 }
1137
1138 /**
1139  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1140  * @mdev:       DRBD device.
1141  * @w:          work object.
1142  * @cancel:     The connection will be closed anyways
1143  */
1144 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1145 {
1146         struct drbd_request *req = container_of(w, struct drbd_request, w);
1147         int ok;
1148
1149         if (unlikely(cancel)) {
1150                 req_mod(req, send_canceled);
1151                 return 1;
1152         }
1153
1154         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1155                                 (unsigned long)req);
1156
1157         if (!ok) {
1158                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1159                  * so this is probably redundant */
1160                 if (mdev->state.conn >= C_CONNECTED)
1161                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1162         }
1163         req_mod(req, ok ? handed_over_to_network : send_failed);
1164
1165         return ok;
1166 }
1167
1168 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1169 {
1170         struct drbd_conf *odev = mdev;
1171
1172         while (1) {
1173                 if (odev->sync_conf.after == -1)
1174                         return 1;
1175                 odev = minor_to_mdev(odev->sync_conf.after);
1176                 ERR_IF(!odev) return 1;
1177                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1178                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1179                     odev->state.aftr_isp || odev->state.peer_isp ||
1180                     odev->state.user_isp)
1181                         return 0;
1182         }
1183 }
1184
1185 /**
1186  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1187  * @mdev:       DRBD device.
1188  *
1189  * Called from process context only (admin command and after_state_ch).
1190  */
1191 static int _drbd_pause_after(struct drbd_conf *mdev)
1192 {
1193         struct drbd_conf *odev;
1194         int i, rv = 0;
1195
1196         for (i = 0; i < minor_count; i++) {
1197                 odev = minor_to_mdev(i);
1198                 if (!odev)
1199                         continue;
1200                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1201                         continue;
1202                 if (!_drbd_may_sync_now(odev))
1203                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1204                                != SS_NOTHING_TO_DO);
1205         }
1206
1207         return rv;
1208 }
1209
1210 /**
1211  * _drbd_resume_next() - Resume resync on all devices that may resync now
1212  * @mdev:       DRBD device.
1213  *
1214  * Called from process context only (admin command and worker).
1215  */
1216 static int _drbd_resume_next(struct drbd_conf *mdev)
1217 {
1218         struct drbd_conf *odev;
1219         int i, rv = 0;
1220
1221         for (i = 0; i < minor_count; i++) {
1222                 odev = minor_to_mdev(i);
1223                 if (!odev)
1224                         continue;
1225                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1226                         continue;
1227                 if (odev->state.aftr_isp) {
1228                         if (_drbd_may_sync_now(odev))
1229                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1230                                                         CS_HARD, NULL)
1231                                        != SS_NOTHING_TO_DO) ;
1232                 }
1233         }
1234         return rv;
1235 }
1236
1237 void resume_next_sg(struct drbd_conf *mdev)
1238 {
1239         write_lock_irq(&global_state_lock);
1240         _drbd_resume_next(mdev);
1241         write_unlock_irq(&global_state_lock);
1242 }
1243
1244 void suspend_other_sg(struct drbd_conf *mdev)
1245 {
1246         write_lock_irq(&global_state_lock);
1247         _drbd_pause_after(mdev);
1248         write_unlock_irq(&global_state_lock);
1249 }
1250
1251 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1252 {
1253         struct drbd_conf *odev;
1254
1255         if (o_minor == -1)
1256                 return NO_ERROR;
1257         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1258                 return ERR_SYNC_AFTER;
1259
1260         /* check for loops */
1261         odev = minor_to_mdev(o_minor);
1262         while (1) {
1263                 if (odev == mdev)
1264                         return ERR_SYNC_AFTER_CYCLE;
1265
1266                 /* dependency chain ends here, no cycles. */
1267                 if (odev->sync_conf.after == -1)
1268                         return NO_ERROR;
1269
1270                 /* follow the dependency chain */
1271                 odev = minor_to_mdev(odev->sync_conf.after);
1272         }
1273 }
1274
1275 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1276 {
1277         int changes;
1278         int retcode;
1279
1280         write_lock_irq(&global_state_lock);
1281         retcode = sync_after_error(mdev, na);
1282         if (retcode == NO_ERROR) {
1283                 mdev->sync_conf.after = na;
1284                 do {
1285                         changes  = _drbd_pause_after(mdev);
1286                         changes |= _drbd_resume_next(mdev);
1287                 } while (changes);
1288         }
1289         write_unlock_irq(&global_state_lock);
1290         return retcode;
1291 }
1292
1293 /**
1294  * drbd_start_resync() - Start the resync process
1295  * @mdev:       DRBD device.
1296  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1297  *
1298  * This function might bring you directly into one of the
1299  * C_PAUSED_SYNC_* states.
1300  */
1301 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1302 {
1303         union drbd_state ns;
1304         int r;
1305
1306         if (mdev->state.conn >= C_SYNC_SOURCE) {
1307                 dev_err(DEV, "Resync already running!\n");
1308                 return;
1309         }
1310
1311         /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1312         drbd_rs_cancel_all(mdev);
1313
1314         if (side == C_SYNC_TARGET) {
1315                 /* Since application IO was locked out during C_WF_BITMAP_T and
1316                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1317                    we check that we might make the data inconsistent. */
1318                 r = drbd_khelper(mdev, "before-resync-target");
1319                 r = (r >> 8) & 0xff;
1320                 if (r > 0) {
1321                         dev_info(DEV, "before-resync-target handler returned %d, "
1322                              "dropping connection.\n", r);
1323                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1324                         return;
1325                 }
1326         }
1327
1328         drbd_state_lock(mdev);
1329
1330         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1331                 drbd_state_unlock(mdev);
1332                 return;
1333         }
1334
1335         if (side == C_SYNC_TARGET) {
1336                 mdev->bm_resync_fo = 0;
1337         } else /* side == C_SYNC_SOURCE */ {
1338                 u64 uuid;
1339
1340                 get_random_bytes(&uuid, sizeof(u64));
1341                 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1342                 drbd_send_sync_uuid(mdev, uuid);
1343
1344                 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1345         }
1346
1347         write_lock_irq(&global_state_lock);
1348         ns = mdev->state;
1349
1350         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1351
1352         ns.conn = side;
1353
1354         if (side == C_SYNC_TARGET)
1355                 ns.disk = D_INCONSISTENT;
1356         else /* side == C_SYNC_SOURCE */
1357                 ns.pdsk = D_INCONSISTENT;
1358
1359         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1360         ns = mdev->state;
1361
1362         if (ns.conn < C_CONNECTED)
1363                 r = SS_UNKNOWN_ERROR;
1364
1365         if (r == SS_SUCCESS) {
1366                 mdev->rs_total     =
1367                 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1368                 mdev->rs_failed    = 0;
1369                 mdev->rs_paused    = 0;
1370                 mdev->rs_start     =
1371                 mdev->rs_mark_time = jiffies;
1372                 mdev->rs_same_csum = 0;
1373                 _drbd_pause_after(mdev);
1374         }
1375         write_unlock_irq(&global_state_lock);
1376         drbd_state_unlock(mdev);
1377         put_ldev(mdev);
1378
1379         if (r == SS_SUCCESS) {
1380                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1381                      drbd_conn_str(ns.conn),
1382                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1383                      (unsigned long) mdev->rs_total);
1384
1385                 if (mdev->rs_total == 0) {
1386                         /* Peer still reachable? Beware of failing before-resync-target handlers! */
1387                         request_ping(mdev);
1388                         __set_current_state(TASK_INTERRUPTIBLE);
1389                         schedule_timeout(mdev->net_conf->ping_timeo*HZ/9); /* 9 instead 10 */
1390                         drbd_resync_finished(mdev);
1391                         return;
1392                 }
1393
1394                 /* ns.conn may already be != mdev->state.conn,
1395                  * we may have been paused in between, or become paused until
1396                  * the timer triggers.
1397                  * No matter, that is handled in resync_timer_fn() */
1398                 if (ns.conn == C_SYNC_TARGET)
1399                         mod_timer(&mdev->resync_timer, jiffies);
1400
1401                 drbd_md_sync(mdev);
1402         }
1403 }
1404
1405 int drbd_worker(struct drbd_thread *thi)
1406 {
1407         struct drbd_conf *mdev = thi->mdev;
1408         struct drbd_work *w = NULL;
1409         LIST_HEAD(work_list);
1410         int intr = 0, i;
1411
1412         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1413
1414         while (get_t_state(thi) == Running) {
1415                 drbd_thread_current_set_cpu(mdev);
1416
1417                 if (down_trylock(&mdev->data.work.s)) {
1418                         mutex_lock(&mdev->data.mutex);
1419                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1420                                 drbd_tcp_uncork(mdev->data.socket);
1421                         mutex_unlock(&mdev->data.mutex);
1422
1423                         intr = down_interruptible(&mdev->data.work.s);
1424
1425                         mutex_lock(&mdev->data.mutex);
1426                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1427                                 drbd_tcp_cork(mdev->data.socket);
1428                         mutex_unlock(&mdev->data.mutex);
1429                 }
1430
1431                 if (intr) {
1432                         D_ASSERT(intr == -EINTR);
1433                         flush_signals(current);
1434                         ERR_IF (get_t_state(thi) == Running)
1435                                 continue;
1436                         break;
1437                 }
1438
1439                 if (get_t_state(thi) != Running)
1440                         break;
1441                 /* With this break, we have done a down() but not consumed
1442                    the entry from the list. The cleanup code takes care of
1443                    this...   */
1444
1445                 w = NULL;
1446                 spin_lock_irq(&mdev->data.work.q_lock);
1447                 ERR_IF(list_empty(&mdev->data.work.q)) {
1448                         /* something terribly wrong in our logic.
1449                          * we were able to down() the semaphore,
1450                          * but the list is empty... doh.
1451                          *
1452                          * what is the best thing to do now?
1453                          * try again from scratch, restarting the receiver,
1454                          * asender, whatnot? could break even more ugly,
1455                          * e.g. when we are primary, but no good local data.
1456                          *
1457                          * I'll try to get away just starting over this loop.
1458                          */
1459                         spin_unlock_irq(&mdev->data.work.q_lock);
1460                         continue;
1461                 }
1462                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1463                 list_del_init(&w->list);
1464                 spin_unlock_irq(&mdev->data.work.q_lock);
1465
1466                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1467                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1468                         if (mdev->state.conn >= C_CONNECTED)
1469                                 drbd_force_state(mdev,
1470                                                 NS(conn, C_NETWORK_FAILURE));
1471                 }
1472         }
1473         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1474         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1475
1476         spin_lock_irq(&mdev->data.work.q_lock);
1477         i = 0;
1478         while (!list_empty(&mdev->data.work.q)) {
1479                 list_splice_init(&mdev->data.work.q, &work_list);
1480                 spin_unlock_irq(&mdev->data.work.q_lock);
1481
1482                 while (!list_empty(&work_list)) {
1483                         w = list_entry(work_list.next, struct drbd_work, list);
1484                         list_del_init(&w->list);
1485                         w->cb(mdev, w, 1);
1486                         i++; /* dead debugging code */
1487                 }
1488
1489                 spin_lock_irq(&mdev->data.work.q_lock);
1490         }
1491         sema_init(&mdev->data.work.s, 0);
1492         /* DANGEROUS race: if someone did queue his work within the spinlock,
1493          * but up() ed outside the spinlock, we could get an up() on the
1494          * semaphore without corresponding list entry.
1495          * So don't do that.
1496          */
1497         spin_unlock_irq(&mdev->data.work.q_lock);
1498
1499         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1500         /* _drbd_set_state only uses stop_nowait.
1501          * wait here for the Exiting receiver. */
1502         drbd_thread_stop(&mdev->receiver);
1503         drbd_mdev_cleanup(mdev);
1504
1505         dev_info(DEV, "worker terminated\n");
1506
1507         clear_bit(DEVICE_DYING, &mdev->flags);
1508         clear_bit(CONFIG_PENDING, &mdev->flags);
1509         wake_up(&mdev->state_wait);
1510
1511         return 0;
1512 }