]> git.karo-electronics.de Git - mv-sheeva.git/blob - drivers/block/drbd/drbd_worker.c
f12822d5386757686bf3530fe9563f77cef49e71
[mv-sheeva.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41
42 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
43
44
45
46 /* defined here:
47    drbd_md_io_complete
48    drbd_endio_sec
49    drbd_endio_pri
50
51  * more endio handlers:
52    atodb_endio in drbd_actlog.c
53    drbd_bm_async_io_complete in drbd_bitmap.c
54
55  * For all these callbacks, note the following:
56  * The callbacks will be called in irq context by the IDE drivers,
57  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58  * Try to get the locking right :)
59  *
60  */
61
62
63 /* About the global_state_lock
64    Each state transition on an device holds a read lock. In case we have
65    to evaluate the sync after dependencies, we grab a write lock, because
66    we need stable states on all devices for that.  */
67 rwlock_t global_state_lock;
68
69 /* used for synchronous meta data and bitmap IO
70  * submitted by drbd_md_sync_page_io()
71  */
72 void drbd_md_io_complete(struct bio *bio, int error)
73 {
74         struct drbd_md_io *md_io;
75
76         md_io = (struct drbd_md_io *)bio->bi_private;
77         md_io->error = error;
78
79         complete(&md_io->event);
80 }
81
82 /* reads on behalf of the partner,
83  * "submitted" by the receiver
84  */
85 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
86 {
87         unsigned long flags = 0;
88         struct drbd_conf *mdev = e->mdev;
89
90         D_ASSERT(e->block_id != ID_VACANT);
91
92         spin_lock_irqsave(&mdev->req_lock, flags);
93         mdev->read_cnt += e->size >> 9;
94         list_del(&e->w.list);
95         if (list_empty(&mdev->read_ee))
96                 wake_up(&mdev->ee_wait);
97         if (test_bit(__EE_WAS_ERROR, &e->flags))
98                 __drbd_chk_io_error(mdev, FALSE);
99         spin_unlock_irqrestore(&mdev->req_lock, flags);
100
101         drbd_queue_work(&mdev->data.work, &e->w);
102         put_ldev(mdev);
103 }
104
105 static int is_failed_barrier(int ee_flags)
106 {
107         return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
108                         == (EE_IS_BARRIER|EE_WAS_ERROR);
109 }
110
111 /* writes on behalf of the partner, or resync writes,
112  * "submitted" by the receiver, final stage.  */
113 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
114 {
115         unsigned long flags = 0;
116         struct drbd_conf *mdev = e->mdev;
117         sector_t e_sector;
118         int do_wake;
119         int is_syncer_req;
120         int do_al_complete_io;
121
122         /* if this is a failed barrier request, disable use of barriers,
123          * and schedule for resubmission */
124         if (is_failed_barrier(e->flags)) {
125                 drbd_bump_write_ordering(mdev, WO_bdev_flush);
126                 spin_lock_irqsave(&mdev->req_lock, flags);
127                 list_del(&e->w.list);
128                 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
129                 e->w.cb = w_e_reissue;
130                 /* put_ldev actually happens below, once we come here again. */
131                 __release(local);
132                 spin_unlock_irqrestore(&mdev->req_lock, flags);
133                 drbd_queue_work(&mdev->data.work, &e->w);
134                 return;
135         }
136
137         D_ASSERT(e->block_id != ID_VACANT);
138
139         /* after we moved e to done_ee,
140          * we may no longer access it,
141          * it may be freed/reused already!
142          * (as soon as we release the req_lock) */
143         e_sector = e->sector;
144         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
145         is_syncer_req = is_syncer_block_id(e->block_id);
146
147         spin_lock_irqsave(&mdev->req_lock, flags);
148         mdev->writ_cnt += e->size >> 9;
149         list_del(&e->w.list); /* has been on active_ee or sync_ee */
150         list_add_tail(&e->w.list, &mdev->done_ee);
151
152         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
153          * neither did we wake possibly waiting conflicting requests.
154          * done from "drbd_process_done_ee" within the appropriate w.cb
155          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
156
157         do_wake = is_syncer_req
158                 ? list_empty(&mdev->sync_ee)
159                 : list_empty(&mdev->active_ee);
160
161         if (test_bit(__EE_WAS_ERROR, &e->flags))
162                 __drbd_chk_io_error(mdev, FALSE);
163         spin_unlock_irqrestore(&mdev->req_lock, flags);
164
165         if (is_syncer_req)
166                 drbd_rs_complete_io(mdev, e_sector);
167
168         if (do_wake)
169                 wake_up(&mdev->ee_wait);
170
171         if (do_al_complete_io)
172                 drbd_al_complete_io(mdev, e_sector);
173
174         wake_asender(mdev);
175         put_ldev(mdev);
176 }
177
178 /* writes on behalf of the partner, or resync writes,
179  * "submitted" by the receiver.
180  */
181 void drbd_endio_sec(struct bio *bio, int error)
182 {
183         struct drbd_epoch_entry *e = bio->bi_private;
184         struct drbd_conf *mdev = e->mdev;
185         int uptodate = bio_flagged(bio, BIO_UPTODATE);
186         int is_write = bio_data_dir(bio) == WRITE;
187
188         if (error)
189                 dev_warn(DEV, "%s: error=%d s=%llus\n",
190                                 is_write ? "write" : "read", error,
191                                 (unsigned long long)e->sector);
192         if (!error && !uptodate) {
193                 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
194                                 is_write ? "write" : "read",
195                                 (unsigned long long)e->sector);
196                 /* strange behavior of some lower level drivers...
197                  * fail the request by clearing the uptodate flag,
198                  * but do not return any error?! */
199                 error = -EIO;
200         }
201
202         if (error)
203                 set_bit(__EE_WAS_ERROR, &e->flags);
204
205         bio_put(bio); /* no need for the bio anymore */
206         if (atomic_dec_and_test(&e->pending_bios)) {
207                 if (is_write)
208                         drbd_endio_write_sec_final(e);
209                 else
210                         drbd_endio_read_sec_final(e);
211         }
212 }
213
214 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
215  */
216 void drbd_endio_pri(struct bio *bio, int error)
217 {
218         struct drbd_request *req = bio->bi_private;
219         struct drbd_conf *mdev = req->mdev;
220         enum drbd_req_event what;
221         int uptodate = bio_flagged(bio, BIO_UPTODATE);
222
223         if (!error && !uptodate) {
224                 dev_warn(DEV, "p %s: setting error to -EIO\n",
225                          bio_data_dir(bio) == WRITE ? "write" : "read");
226                 /* strange behavior of some lower level drivers...
227                  * fail the request by clearing the uptodate flag,
228                  * but do not return any error?! */
229                 error = -EIO;
230         }
231
232         /* to avoid recursion in __req_mod */
233         if (unlikely(error)) {
234                 what = (bio_data_dir(bio) == WRITE)
235                         ? write_completed_with_error
236                         : (bio_rw(bio) == READ)
237                           ? read_completed_with_error
238                           : read_ahead_completed_with_error;
239         } else
240                 what = completed_ok;
241
242         bio_put(req->private_bio);
243         req->private_bio = ERR_PTR(error);
244
245         req_mod(req, what);
246 }
247
248 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
249 {
250         struct drbd_request *req = container_of(w, struct drbd_request, w);
251
252         /* We should not detach for read io-error,
253          * but try to WRITE the P_DATA_REPLY to the failed location,
254          * to give the disk the chance to relocate that block */
255
256         spin_lock_irq(&mdev->req_lock);
257         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
258                 _req_mod(req, read_retry_remote_canceled);
259                 spin_unlock_irq(&mdev->req_lock);
260                 return 1;
261         }
262         spin_unlock_irq(&mdev->req_lock);
263
264         return w_send_read_req(mdev, w, 0);
265 }
266
267 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
268 {
269         ERR_IF(cancel) return 1;
270         dev_err(DEV, "resync inactive, but callback triggered??\n");
271         return 1; /* Simply ignore this! */
272 }
273
274 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
275 {
276         struct hash_desc desc;
277         struct scatterlist sg;
278         struct page *page = e->pages;
279         struct page *tmp;
280         unsigned len;
281
282         desc.tfm = tfm;
283         desc.flags = 0;
284
285         sg_init_table(&sg, 1);
286         crypto_hash_init(&desc);
287
288         while ((tmp = page_chain_next(page))) {
289                 /* all but the last page will be fully used */
290                 sg_set_page(&sg, page, PAGE_SIZE, 0);
291                 crypto_hash_update(&desc, &sg, sg.length);
292                 page = tmp;
293         }
294         /* and now the last, possibly only partially used page */
295         len = e->size & (PAGE_SIZE - 1);
296         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
297         crypto_hash_update(&desc, &sg, sg.length);
298         crypto_hash_final(&desc, digest);
299 }
300
301 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
302 {
303         struct hash_desc desc;
304         struct scatterlist sg;
305         struct bio_vec *bvec;
306         int i;
307
308         desc.tfm = tfm;
309         desc.flags = 0;
310
311         sg_init_table(&sg, 1);
312         crypto_hash_init(&desc);
313
314         __bio_for_each_segment(bvec, bio, i, 0) {
315                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
316                 crypto_hash_update(&desc, &sg, sg.length);
317         }
318         crypto_hash_final(&desc, digest);
319 }
320
321 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
322 {
323         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
324         int digest_size;
325         void *digest;
326         int ok;
327
328         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
329
330         if (unlikely(cancel)) {
331                 drbd_free_ee(mdev, e);
332                 return 1;
333         }
334
335         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
336                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
337                 digest = kmalloc(digest_size, GFP_NOIO);
338                 if (digest) {
339                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
340
341                         inc_rs_pending(mdev);
342                         ok = drbd_send_drequest_csum(mdev,
343                                                      e->sector,
344                                                      e->size,
345                                                      digest,
346                                                      digest_size,
347                                                      P_CSUM_RS_REQUEST);
348                         kfree(digest);
349                 } else {
350                         dev_err(DEV, "kmalloc() of digest failed.\n");
351                         ok = 0;
352                 }
353         } else
354                 ok = 1;
355
356         drbd_free_ee(mdev, e);
357
358         if (unlikely(!ok))
359                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
360         return ok;
361 }
362
363 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
364
365 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
366 {
367         struct drbd_epoch_entry *e;
368
369         if (!get_ldev(mdev))
370                 return -EIO;
371
372         if (drbd_rs_should_slow_down(mdev))
373                 goto defer;
374
375         /* GFP_TRY, because if there is no memory available right now, this may
376          * be rescheduled for later. It is "only" background resync, after all. */
377         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
378         if (!e)
379                 goto defer;
380
381         e->w.cb = w_e_send_csum;
382         spin_lock_irq(&mdev->req_lock);
383         list_add(&e->w.list, &mdev->read_ee);
384         spin_unlock_irq(&mdev->req_lock);
385
386         atomic_add(size >> 9, &mdev->rs_sect_ev);
387         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
388                 return 0;
389
390         /* drbd_submit_ee currently fails for one reason only:
391          * not being able to allocate enough bios.
392          * Is dropping the connection going to help? */
393         spin_lock_irq(&mdev->req_lock);
394         list_del(&e->w.list);
395         spin_unlock_irq(&mdev->req_lock);
396
397         drbd_free_ee(mdev, e);
398 defer:
399         put_ldev(mdev);
400         return -EAGAIN;
401 }
402
403 void resync_timer_fn(unsigned long data)
404 {
405         struct drbd_conf *mdev = (struct drbd_conf *) data;
406         int queue;
407
408         queue = 1;
409         switch (mdev->state.conn) {
410         case C_VERIFY_S:
411                 mdev->resync_work.cb = w_make_ov_request;
412                 break;
413         case C_SYNC_TARGET:
414                 mdev->resync_work.cb = w_make_resync_request;
415                 break;
416         default:
417                 queue = 0;
418                 mdev->resync_work.cb = w_resync_inactive;
419         }
420
421         /* harmless race: list_empty outside data.work.q_lock */
422         if (list_empty(&mdev->resync_work.list) && queue)
423                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
424 }
425
426 static void fifo_set(struct fifo_buffer *fb, int value)
427 {
428         int i;
429
430         for (i = 0; i < fb->size; i++)
431                 fb->values[i] = value;
432 }
433
434 static int fifo_push(struct fifo_buffer *fb, int value)
435 {
436         int ov;
437
438         ov = fb->values[fb->head_index];
439         fb->values[fb->head_index++] = value;
440
441         if (fb->head_index >= fb->size)
442                 fb->head_index = 0;
443
444         return ov;
445 }
446
447 static void fifo_add_val(struct fifo_buffer *fb, int value)
448 {
449         int i;
450
451         for (i = 0; i < fb->size; i++)
452                 fb->values[i] += value;
453 }
454
455 int drbd_rs_controller(struct drbd_conf *mdev)
456 {
457         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
458         unsigned int want;     /* The number of sectors we want in the proxy */
459         int req_sect; /* Number of sectors to request in this turn */
460         int correction; /* Number of sectors more we need in the proxy*/
461         int cps; /* correction per invocation of drbd_rs_controller() */
462         int steps; /* Number of time steps to plan ahead */
463         int curr_corr;
464         int max_sect;
465
466         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
467         mdev->rs_in_flight -= sect_in;
468
469         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
470
471         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
472
473         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
474                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
475         } else { /* normal path */
476                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
477                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
478         }
479
480         correction = want - mdev->rs_in_flight - mdev->rs_planed;
481
482         /* Plan ahead */
483         cps = correction / steps;
484         fifo_add_val(&mdev->rs_plan_s, cps);
485         mdev->rs_planed += cps * steps;
486
487         /* What we do in this step */
488         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
489         spin_unlock(&mdev->peer_seq_lock);
490         mdev->rs_planed -= curr_corr;
491
492         req_sect = sect_in + curr_corr;
493         if (req_sect < 0)
494                 req_sect = 0;
495
496         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
497         if (req_sect > max_sect)
498                 req_sect = max_sect;
499
500         /*
501         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
502                  sect_in, mdev->rs_in_flight, want, correction,
503                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
504         */
505
506         return req_sect;
507 }
508
509 int w_make_resync_request(struct drbd_conf *mdev,
510                 struct drbd_work *w, int cancel)
511 {
512         unsigned long bit;
513         sector_t sector;
514         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
515         int max_segment_size;
516         int number, rollback_i, size, pe, mx;
517         int align, queued, sndbuf;
518         int i = 0;
519
520         if (unlikely(cancel))
521                 return 1;
522
523         if (unlikely(mdev->state.conn < C_CONNECTED)) {
524                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
525                 return 0;
526         }
527
528         if (mdev->state.conn != C_SYNC_TARGET)
529                 dev_err(DEV, "%s in w_make_resync_request\n",
530                         drbd_conn_str(mdev->state.conn));
531
532         if (mdev->rs_total == 0) {
533                 /* empty resync? */
534                 drbd_resync_finished(mdev);
535                 return 1;
536         }
537
538         if (!get_ldev(mdev)) {
539                 /* Since we only need to access mdev->rsync a
540                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
541                    to continue resync with a broken disk makes no sense at
542                    all */
543                 dev_err(DEV, "Disk broke down during resync!\n");
544                 mdev->resync_work.cb = w_resync_inactive;
545                 return 1;
546         }
547
548         /* starting with drbd 8.3.8, we can handle multi-bio EEs,
549          * if it should be necessary */
550         max_segment_size =
551                 mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) :
552                 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE;
553
554         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
555                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
556                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
557         } else {
558                 mdev->c_sync_rate = mdev->sync_conf.rate;
559                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
560         }
561
562         /* Throttle resync on lower level disk activity, which may also be
563          * caused by application IO on Primary/SyncTarget.
564          * Keep this after the call to drbd_rs_controller, as that assumes
565          * to be called as precisely as possible every SLEEP_TIME,
566          * and would be confused otherwise. */
567         if (drbd_rs_should_slow_down(mdev))
568                 goto requeue;
569
570         mutex_lock(&mdev->data.mutex);
571         if (mdev->data.socket)
572                 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
573         else
574                 mx = 1;
575         mutex_unlock(&mdev->data.mutex);
576
577         /* For resync rates >160MB/sec, allow more pending RS requests */
578         if (number > mx)
579                 mx = number;
580
581         /* Limit the number of pending RS requests to no more than the peer's receive buffer */
582         pe = atomic_read(&mdev->rs_pending_cnt);
583         if ((pe + number) > mx) {
584                 number = mx - pe;
585         }
586
587         for (i = 0; i < number; i++) {
588                 /* Stop generating RS requests, when half of the send buffer is filled */
589                 mutex_lock(&mdev->data.mutex);
590                 if (mdev->data.socket) {
591                         queued = mdev->data.socket->sk->sk_wmem_queued;
592                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
593                 } else {
594                         queued = 1;
595                         sndbuf = 0;
596                 }
597                 mutex_unlock(&mdev->data.mutex);
598                 if (queued > sndbuf / 2)
599                         goto requeue;
600
601 next_sector:
602                 size = BM_BLOCK_SIZE;
603                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
604
605                 if (bit == -1UL) {
606                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
607                         mdev->resync_work.cb = w_resync_inactive;
608                         put_ldev(mdev);
609                         return 1;
610                 }
611
612                 sector = BM_BIT_TO_SECT(bit);
613
614                 if (drbd_try_rs_begin_io(mdev, sector)) {
615                         mdev->bm_resync_fo = bit;
616                         goto requeue;
617                 }
618                 mdev->bm_resync_fo = bit + 1;
619
620                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
621                         drbd_rs_complete_io(mdev, sector);
622                         goto next_sector;
623                 }
624
625 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
626                 /* try to find some adjacent bits.
627                  * we stop if we have already the maximum req size.
628                  *
629                  * Additionally always align bigger requests, in order to
630                  * be prepared for all stripe sizes of software RAIDs.
631                  */
632                 align = 1;
633                 rollback_i = i;
634                 for (;;) {
635                         if (size + BM_BLOCK_SIZE > max_segment_size)
636                                 break;
637
638                         /* Be always aligned */
639                         if (sector & ((1<<(align+3))-1))
640                                 break;
641
642                         /* do not cross extent boundaries */
643                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
644                                 break;
645                         /* now, is it actually dirty, after all?
646                          * caution, drbd_bm_test_bit is tri-state for some
647                          * obscure reason; ( b == 0 ) would get the out-of-band
648                          * only accidentally right because of the "oddly sized"
649                          * adjustment below */
650                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
651                                 break;
652                         bit++;
653                         size += BM_BLOCK_SIZE;
654                         if ((BM_BLOCK_SIZE << align) <= size)
655                                 align++;
656                         i++;
657                 }
658                 /* if we merged some,
659                  * reset the offset to start the next drbd_bm_find_next from */
660                 if (size > BM_BLOCK_SIZE)
661                         mdev->bm_resync_fo = bit + 1;
662 #endif
663
664                 /* adjust very last sectors, in case we are oddly sized */
665                 if (sector + (size>>9) > capacity)
666                         size = (capacity-sector)<<9;
667                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
668                         switch (read_for_csum(mdev, sector, size)) {
669                         case -EIO: /* Disk failure */
670                                 put_ldev(mdev);
671                                 return 0;
672                         case -EAGAIN: /* allocation failed, or ldev busy */
673                                 drbd_rs_complete_io(mdev, sector);
674                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
675                                 i = rollback_i;
676                                 goto requeue;
677                         case 0:
678                                 /* everything ok */
679                                 break;
680                         default:
681                                 BUG();
682                         }
683                 } else {
684                         inc_rs_pending(mdev);
685                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
686                                                sector, size, ID_SYNCER)) {
687                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
688                                 dec_rs_pending(mdev);
689                                 put_ldev(mdev);
690                                 return 0;
691                         }
692                 }
693         }
694
695         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
696                 /* last syncer _request_ was sent,
697                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
698                  * next sync group will resume), as soon as we receive the last
699                  * resync data block, and the last bit is cleared.
700                  * until then resync "work" is "inactive" ...
701                  */
702                 mdev->resync_work.cb = w_resync_inactive;
703                 put_ldev(mdev);
704                 return 1;
705         }
706
707  requeue:
708         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
709         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
710         put_ldev(mdev);
711         return 1;
712 }
713
714 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
715 {
716         int number, i, size;
717         sector_t sector;
718         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
719
720         if (unlikely(cancel))
721                 return 1;
722
723         if (unlikely(mdev->state.conn < C_CONNECTED)) {
724                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
725                 return 0;
726         }
727
728         number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
729         if (atomic_read(&mdev->rs_pending_cnt) > number)
730                 goto requeue;
731
732         number -= atomic_read(&mdev->rs_pending_cnt);
733
734         sector = mdev->ov_position;
735         for (i = 0; i < number; i++) {
736                 if (sector >= capacity) {
737                         mdev->resync_work.cb = w_resync_inactive;
738                         return 1;
739                 }
740
741                 size = BM_BLOCK_SIZE;
742
743                 if (drbd_try_rs_begin_io(mdev, sector)) {
744                         mdev->ov_position = sector;
745                         goto requeue;
746                 }
747
748                 if (sector + (size>>9) > capacity)
749                         size = (capacity-sector)<<9;
750
751                 inc_rs_pending(mdev);
752                 if (!drbd_send_ov_request(mdev, sector, size)) {
753                         dec_rs_pending(mdev);
754                         return 0;
755                 }
756                 sector += BM_SECT_PER_BIT;
757         }
758         mdev->ov_position = sector;
759
760  requeue:
761         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
762         return 1;
763 }
764
765
766 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
767 {
768         kfree(w);
769         ov_oos_print(mdev);
770         drbd_resync_finished(mdev);
771
772         return 1;
773 }
774
775 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
776 {
777         kfree(w);
778
779         drbd_resync_finished(mdev);
780
781         return 1;
782 }
783
784 static void ping_peer(struct drbd_conf *mdev)
785 {
786         clear_bit(GOT_PING_ACK, &mdev->flags);
787         request_ping(mdev);
788         wait_event(mdev->misc_wait,
789                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
790 }
791
792 int drbd_resync_finished(struct drbd_conf *mdev)
793 {
794         unsigned long db, dt, dbdt;
795         unsigned long n_oos;
796         union drbd_state os, ns;
797         struct drbd_work *w;
798         char *khelper_cmd = NULL;
799
800         /* Remove all elements from the resync LRU. Since future actions
801          * might set bits in the (main) bitmap, then the entries in the
802          * resync LRU would be wrong. */
803         if (drbd_rs_del_all(mdev)) {
804                 /* In case this is not possible now, most probably because
805                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
806                  * queue (or even the read operations for those packets
807                  * is not finished by now).   Retry in 100ms. */
808
809                 drbd_kick_lo(mdev);
810                 __set_current_state(TASK_INTERRUPTIBLE);
811                 schedule_timeout(HZ / 10);
812                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
813                 if (w) {
814                         w->cb = w_resync_finished;
815                         drbd_queue_work(&mdev->data.work, w);
816                         return 1;
817                 }
818                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
819         }
820
821         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
822         if (dt <= 0)
823                 dt = 1;
824         db = mdev->rs_total;
825         dbdt = Bit2KB(db/dt);
826         mdev->rs_paused /= HZ;
827
828         if (!get_ldev(mdev))
829                 goto out;
830
831         ping_peer(mdev);
832
833         spin_lock_irq(&mdev->req_lock);
834         os = mdev->state;
835
836         /* This protects us against multiple calls (that can happen in the presence
837            of application IO), and against connectivity loss just before we arrive here. */
838         if (os.conn <= C_CONNECTED)
839                 goto out_unlock;
840
841         ns = os;
842         ns.conn = C_CONNECTED;
843
844         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
845              (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
846              "Online verify " : "Resync",
847              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
848
849         n_oos = drbd_bm_total_weight(mdev);
850
851         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
852                 if (n_oos) {
853                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
854                               n_oos, Bit2KB(1));
855                         khelper_cmd = "out-of-sync";
856                 }
857         } else {
858                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
859
860                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
861                         khelper_cmd = "after-resync-target";
862
863                 if (mdev->csums_tfm && mdev->rs_total) {
864                         const unsigned long s = mdev->rs_same_csum;
865                         const unsigned long t = mdev->rs_total;
866                         const int ratio =
867                                 (t == 0)     ? 0 :
868                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
869                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
870                              "transferred %luK total %luK\n",
871                              ratio,
872                              Bit2KB(mdev->rs_same_csum),
873                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
874                              Bit2KB(mdev->rs_total));
875                 }
876         }
877
878         if (mdev->rs_failed) {
879                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
880
881                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
882                         ns.disk = D_INCONSISTENT;
883                         ns.pdsk = D_UP_TO_DATE;
884                 } else {
885                         ns.disk = D_UP_TO_DATE;
886                         ns.pdsk = D_INCONSISTENT;
887                 }
888         } else {
889                 ns.disk = D_UP_TO_DATE;
890                 ns.pdsk = D_UP_TO_DATE;
891
892                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
893                         if (mdev->p_uuid) {
894                                 int i;
895                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
896                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
897                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
898                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
899                         } else {
900                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
901                         }
902                 }
903
904                 drbd_uuid_set_bm(mdev, 0UL);
905
906                 if (mdev->p_uuid) {
907                         /* Now the two UUID sets are equal, update what we
908                          * know of the peer. */
909                         int i;
910                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
911                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
912                 }
913         }
914
915         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
916 out_unlock:
917         spin_unlock_irq(&mdev->req_lock);
918         put_ldev(mdev);
919 out:
920         mdev->rs_total  = 0;
921         mdev->rs_failed = 0;
922         mdev->rs_paused = 0;
923         mdev->ov_start_sector = 0;
924
925         if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
926                 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
927                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
928         }
929
930         if (khelper_cmd)
931                 drbd_khelper(mdev, khelper_cmd);
932
933         return 1;
934 }
935
936 /* helper */
937 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
938 {
939         if (drbd_ee_has_active_page(e)) {
940                 /* This might happen if sendpage() has not finished */
941                 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
942                 atomic_add(i, &mdev->pp_in_use_by_net);
943                 atomic_sub(i, &mdev->pp_in_use);
944                 spin_lock_irq(&mdev->req_lock);
945                 list_add_tail(&e->w.list, &mdev->net_ee);
946                 spin_unlock_irq(&mdev->req_lock);
947                 wake_up(&drbd_pp_wait);
948         } else
949                 drbd_free_ee(mdev, e);
950 }
951
952 /**
953  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
954  * @mdev:       DRBD device.
955  * @w:          work object.
956  * @cancel:     The connection will be closed anyways
957  */
958 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
959 {
960         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
961         int ok;
962
963         if (unlikely(cancel)) {
964                 drbd_free_ee(mdev, e);
965                 dec_unacked(mdev);
966                 return 1;
967         }
968
969         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
970                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
971         } else {
972                 if (__ratelimit(&drbd_ratelimit_state))
973                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
974                             (unsigned long long)e->sector);
975
976                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
977         }
978
979         dec_unacked(mdev);
980
981         move_to_net_ee_or_free(mdev, e);
982
983         if (unlikely(!ok))
984                 dev_err(DEV, "drbd_send_block() failed\n");
985         return ok;
986 }
987
988 /**
989  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
990  * @mdev:       DRBD device.
991  * @w:          work object.
992  * @cancel:     The connection will be closed anyways
993  */
994 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
995 {
996         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
997         int ok;
998
999         if (unlikely(cancel)) {
1000                 drbd_free_ee(mdev, e);
1001                 dec_unacked(mdev);
1002                 return 1;
1003         }
1004
1005         if (get_ldev_if_state(mdev, D_FAILED)) {
1006                 drbd_rs_complete_io(mdev, e->sector);
1007                 put_ldev(mdev);
1008         }
1009
1010         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1011                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1012                         inc_rs_pending(mdev);
1013                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1014                 } else {
1015                         if (__ratelimit(&drbd_ratelimit_state))
1016                                 dev_err(DEV, "Not sending RSDataReply, "
1017                                     "partner DISKLESS!\n");
1018                         ok = 1;
1019                 }
1020         } else {
1021                 if (__ratelimit(&drbd_ratelimit_state))
1022                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1023                             (unsigned long long)e->sector);
1024
1025                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1026
1027                 /* update resync data with failure */
1028                 drbd_rs_failed_io(mdev, e->sector, e->size);
1029         }
1030
1031         dec_unacked(mdev);
1032
1033         move_to_net_ee_or_free(mdev, e);
1034
1035         if (unlikely(!ok))
1036                 dev_err(DEV, "drbd_send_block() failed\n");
1037         return ok;
1038 }
1039
1040 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1041 {
1042         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1043         struct digest_info *di;
1044         int digest_size;
1045         void *digest = NULL;
1046         int ok, eq = 0;
1047
1048         if (unlikely(cancel)) {
1049                 drbd_free_ee(mdev, e);
1050                 dec_unacked(mdev);
1051                 return 1;
1052         }
1053
1054         if (get_ldev(mdev)) {
1055                 drbd_rs_complete_io(mdev, e->sector);
1056                 put_ldev(mdev);
1057         }
1058
1059         di = e->digest;
1060
1061         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1062                 /* quick hack to try to avoid a race against reconfiguration.
1063                  * a real fix would be much more involved,
1064                  * introducing more locking mechanisms */
1065                 if (mdev->csums_tfm) {
1066                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1067                         D_ASSERT(digest_size == di->digest_size);
1068                         digest = kmalloc(digest_size, GFP_NOIO);
1069                 }
1070                 if (digest) {
1071                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1072                         eq = !memcmp(digest, di->digest, digest_size);
1073                         kfree(digest);
1074                 }
1075
1076                 if (eq) {
1077                         drbd_set_in_sync(mdev, e->sector, e->size);
1078                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1079                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1080                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1081                 } else {
1082                         inc_rs_pending(mdev);
1083                         e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1084                         e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1085                         kfree(di);
1086                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1087                 }
1088         } else {
1089                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1090                 if (__ratelimit(&drbd_ratelimit_state))
1091                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1092         }
1093
1094         dec_unacked(mdev);
1095         move_to_net_ee_or_free(mdev, e);
1096
1097         if (unlikely(!ok))
1098                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1099         return ok;
1100 }
1101
1102 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1103 {
1104         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1105         int digest_size;
1106         void *digest;
1107         int ok = 1;
1108
1109         if (unlikely(cancel))
1110                 goto out;
1111
1112         if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1113                 goto out;
1114
1115         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1116         /* FIXME if this allocation fails, online verify will not terminate! */
1117         digest = kmalloc(digest_size, GFP_NOIO);
1118         if (digest) {
1119                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1120                 inc_rs_pending(mdev);
1121                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1122                                              digest, digest_size, P_OV_REPLY);
1123                 if (!ok)
1124                         dec_rs_pending(mdev);
1125                 kfree(digest);
1126         }
1127
1128 out:
1129         drbd_free_ee(mdev, e);
1130
1131         dec_unacked(mdev);
1132
1133         return ok;
1134 }
1135
1136 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1137 {
1138         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1139                 mdev->ov_last_oos_size += size>>9;
1140         } else {
1141                 mdev->ov_last_oos_start = sector;
1142                 mdev->ov_last_oos_size = size>>9;
1143         }
1144         drbd_set_out_of_sync(mdev, sector, size);
1145         set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1146 }
1147
1148 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1149 {
1150         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1151         struct digest_info *di;
1152         int digest_size;
1153         void *digest;
1154         int ok, eq = 0;
1155
1156         if (unlikely(cancel)) {
1157                 drbd_free_ee(mdev, e);
1158                 dec_unacked(mdev);
1159                 return 1;
1160         }
1161
1162         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1163          * the resync lru has been cleaned up already */
1164         if (get_ldev(mdev)) {
1165                 drbd_rs_complete_io(mdev, e->sector);
1166                 put_ldev(mdev);
1167         }
1168
1169         di = e->digest;
1170
1171         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1172                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1173                 digest = kmalloc(digest_size, GFP_NOIO);
1174                 if (digest) {
1175                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1176
1177                         D_ASSERT(digest_size == di->digest_size);
1178                         eq = !memcmp(digest, di->digest, digest_size);
1179                         kfree(digest);
1180                 }
1181         } else {
1182                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1183                 if (__ratelimit(&drbd_ratelimit_state))
1184                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1185         }
1186
1187         dec_unacked(mdev);
1188         if (!eq)
1189                 drbd_ov_oos_found(mdev, e->sector, e->size);
1190         else
1191                 ov_oos_print(mdev);
1192
1193         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1194                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1195
1196         drbd_free_ee(mdev, e);
1197
1198         if (--mdev->ov_left == 0) {
1199                 ov_oos_print(mdev);
1200                 drbd_resync_finished(mdev);
1201         }
1202
1203         return ok;
1204 }
1205
1206 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1207 {
1208         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1209         complete(&b->done);
1210         return 1;
1211 }
1212
1213 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1214 {
1215         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1216         struct p_barrier *p = &mdev->data.sbuf.barrier;
1217         int ok = 1;
1218
1219         /* really avoid racing with tl_clear.  w.cb may have been referenced
1220          * just before it was reassigned and re-queued, so double check that.
1221          * actually, this race was harmless, since we only try to send the
1222          * barrier packet here, and otherwise do nothing with the object.
1223          * but compare with the head of w_clear_epoch */
1224         spin_lock_irq(&mdev->req_lock);
1225         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1226                 cancel = 1;
1227         spin_unlock_irq(&mdev->req_lock);
1228         if (cancel)
1229                 return 1;
1230
1231         if (!drbd_get_data_sock(mdev))
1232                 return 0;
1233         p->barrier = b->br_number;
1234         /* inc_ap_pending was done where this was queued.
1235          * dec_ap_pending will be done in got_BarrierAck
1236          * or (on connection loss) in w_clear_epoch.  */
1237         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1238                                 (struct p_header80 *)p, sizeof(*p), 0);
1239         drbd_put_data_sock(mdev);
1240
1241         return ok;
1242 }
1243
1244 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1245 {
1246         if (cancel)
1247                 return 1;
1248         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1249 }
1250
1251 /**
1252  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1253  * @mdev:       DRBD device.
1254  * @w:          work object.
1255  * @cancel:     The connection will be closed anyways
1256  */
1257 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1258 {
1259         struct drbd_request *req = container_of(w, struct drbd_request, w);
1260         int ok;
1261
1262         if (unlikely(cancel)) {
1263                 req_mod(req, send_canceled);
1264                 return 1;
1265         }
1266
1267         ok = drbd_send_dblock(mdev, req);
1268         req_mod(req, ok ? handed_over_to_network : send_failed);
1269
1270         return ok;
1271 }
1272
1273 /**
1274  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1275  * @mdev:       DRBD device.
1276  * @w:          work object.
1277  * @cancel:     The connection will be closed anyways
1278  */
1279 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1280 {
1281         struct drbd_request *req = container_of(w, struct drbd_request, w);
1282         int ok;
1283
1284         if (unlikely(cancel)) {
1285                 req_mod(req, send_canceled);
1286                 return 1;
1287         }
1288
1289         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1290                                 (unsigned long)req);
1291
1292         if (!ok) {
1293                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1294                  * so this is probably redundant */
1295                 if (mdev->state.conn >= C_CONNECTED)
1296                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1297         }
1298         req_mod(req, ok ? handed_over_to_network : send_failed);
1299
1300         return ok;
1301 }
1302
1303 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1304 {
1305         struct drbd_request *req = container_of(w, struct drbd_request, w);
1306
1307         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1308                 drbd_al_begin_io(mdev, req->sector);
1309         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1310            theoretically. Practically it can not deadlock, since this is
1311            only used when unfreezing IOs. All the extents of the requests
1312            that made it into the TL are already active */
1313
1314         drbd_req_make_private_bio(req, req->master_bio);
1315         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1316         generic_make_request(req->private_bio);
1317
1318         return 1;
1319 }
1320
1321 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1322 {
1323         struct drbd_conf *odev = mdev;
1324
1325         while (1) {
1326                 if (odev->sync_conf.after == -1)
1327                         return 1;
1328                 odev = minor_to_mdev(odev->sync_conf.after);
1329                 ERR_IF(!odev) return 1;
1330                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1331                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1332                     odev->state.aftr_isp || odev->state.peer_isp ||
1333                     odev->state.user_isp)
1334                         return 0;
1335         }
1336 }
1337
1338 /**
1339  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1340  * @mdev:       DRBD device.
1341  *
1342  * Called from process context only (admin command and after_state_ch).
1343  */
1344 static int _drbd_pause_after(struct drbd_conf *mdev)
1345 {
1346         struct drbd_conf *odev;
1347         int i, rv = 0;
1348
1349         for (i = 0; i < minor_count; i++) {
1350                 odev = minor_to_mdev(i);
1351                 if (!odev)
1352                         continue;
1353                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1354                         continue;
1355                 if (!_drbd_may_sync_now(odev))
1356                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1357                                != SS_NOTHING_TO_DO);
1358         }
1359
1360         return rv;
1361 }
1362
1363 /**
1364  * _drbd_resume_next() - Resume resync on all devices that may resync now
1365  * @mdev:       DRBD device.
1366  *
1367  * Called from process context only (admin command and worker).
1368  */
1369 static int _drbd_resume_next(struct drbd_conf *mdev)
1370 {
1371         struct drbd_conf *odev;
1372         int i, rv = 0;
1373
1374         for (i = 0; i < minor_count; i++) {
1375                 odev = minor_to_mdev(i);
1376                 if (!odev)
1377                         continue;
1378                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1379                         continue;
1380                 if (odev->state.aftr_isp) {
1381                         if (_drbd_may_sync_now(odev))
1382                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1383                                                         CS_HARD, NULL)
1384                                        != SS_NOTHING_TO_DO) ;
1385                 }
1386         }
1387         return rv;
1388 }
1389
1390 void resume_next_sg(struct drbd_conf *mdev)
1391 {
1392         write_lock_irq(&global_state_lock);
1393         _drbd_resume_next(mdev);
1394         write_unlock_irq(&global_state_lock);
1395 }
1396
1397 void suspend_other_sg(struct drbd_conf *mdev)
1398 {
1399         write_lock_irq(&global_state_lock);
1400         _drbd_pause_after(mdev);
1401         write_unlock_irq(&global_state_lock);
1402 }
1403
1404 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1405 {
1406         struct drbd_conf *odev;
1407
1408         if (o_minor == -1)
1409                 return NO_ERROR;
1410         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1411                 return ERR_SYNC_AFTER;
1412
1413         /* check for loops */
1414         odev = minor_to_mdev(o_minor);
1415         while (1) {
1416                 if (odev == mdev)
1417                         return ERR_SYNC_AFTER_CYCLE;
1418
1419                 /* dependency chain ends here, no cycles. */
1420                 if (odev->sync_conf.after == -1)
1421                         return NO_ERROR;
1422
1423                 /* follow the dependency chain */
1424                 odev = minor_to_mdev(odev->sync_conf.after);
1425         }
1426 }
1427
1428 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1429 {
1430         int changes;
1431         int retcode;
1432
1433         write_lock_irq(&global_state_lock);
1434         retcode = sync_after_error(mdev, na);
1435         if (retcode == NO_ERROR) {
1436                 mdev->sync_conf.after = na;
1437                 do {
1438                         changes  = _drbd_pause_after(mdev);
1439                         changes |= _drbd_resume_next(mdev);
1440                 } while (changes);
1441         }
1442         write_unlock_irq(&global_state_lock);
1443         return retcode;
1444 }
1445
1446 /**
1447  * drbd_start_resync() - Start the resync process
1448  * @mdev:       DRBD device.
1449  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1450  *
1451  * This function might bring you directly into one of the
1452  * C_PAUSED_SYNC_* states.
1453  */
1454 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1455 {
1456         union drbd_state ns;
1457         int r;
1458
1459         if (mdev->state.conn >= C_SYNC_SOURCE) {
1460                 dev_err(DEV, "Resync already running!\n");
1461                 return;
1462         }
1463
1464         /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1465         drbd_rs_cancel_all(mdev);
1466
1467         if (side == C_SYNC_TARGET) {
1468                 /* Since application IO was locked out during C_WF_BITMAP_T and
1469                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1470                    we check that we might make the data inconsistent. */
1471                 r = drbd_khelper(mdev, "before-resync-target");
1472                 r = (r >> 8) & 0xff;
1473                 if (r > 0) {
1474                         dev_info(DEV, "before-resync-target handler returned %d, "
1475                              "dropping connection.\n", r);
1476                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1477                         return;
1478                 }
1479         }
1480
1481         drbd_state_lock(mdev);
1482
1483         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1484                 drbd_state_unlock(mdev);
1485                 return;
1486         }
1487
1488         if (side == C_SYNC_TARGET) {
1489                 mdev->bm_resync_fo = 0;
1490         } else /* side == C_SYNC_SOURCE */ {
1491                 u64 uuid;
1492
1493                 get_random_bytes(&uuid, sizeof(u64));
1494                 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1495                 drbd_send_sync_uuid(mdev, uuid);
1496
1497                 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1498         }
1499
1500         write_lock_irq(&global_state_lock);
1501         ns = mdev->state;
1502
1503         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1504
1505         ns.conn = side;
1506
1507         if (side == C_SYNC_TARGET)
1508                 ns.disk = D_INCONSISTENT;
1509         else /* side == C_SYNC_SOURCE */
1510                 ns.pdsk = D_INCONSISTENT;
1511
1512         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1513         ns = mdev->state;
1514
1515         if (ns.conn < C_CONNECTED)
1516                 r = SS_UNKNOWN_ERROR;
1517
1518         if (r == SS_SUCCESS) {
1519                 unsigned long tw = drbd_bm_total_weight(mdev);
1520                 unsigned long now = jiffies;
1521                 int i;
1522
1523                 mdev->rs_failed    = 0;
1524                 mdev->rs_paused    = 0;
1525                 mdev->rs_same_csum = 0;
1526                 mdev->rs_last_events = 0;
1527                 mdev->rs_last_sect_ev = 0;
1528                 mdev->rs_total     = tw;
1529                 mdev->rs_start     = now;
1530                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1531                         mdev->rs_mark_left[i] = tw;
1532                         mdev->rs_mark_time[i] = now;
1533                 }
1534                 _drbd_pause_after(mdev);
1535         }
1536         write_unlock_irq(&global_state_lock);
1537         put_ldev(mdev);
1538
1539         if (r == SS_SUCCESS) {
1540                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1541                      drbd_conn_str(ns.conn),
1542                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1543                      (unsigned long) mdev->rs_total);
1544
1545                 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1546                         /* This still has a race (about when exactly the peers
1547                          * detect connection loss) that can lead to a full sync
1548                          * on next handshake. In 8.3.9 we fixed this with explicit
1549                          * resync-finished notifications, but the fix
1550                          * introduces a protocol change.  Sleeping for some
1551                          * time longer than the ping interval + timeout on the
1552                          * SyncSource, to give the SyncTarget the chance to
1553                          * detect connection loss, then waiting for a ping
1554                          * response (implicit in drbd_resync_finished) reduces
1555                          * the race considerably, but does not solve it. */
1556                         if (side == C_SYNC_SOURCE)
1557                                 schedule_timeout_interruptible(
1558                                         mdev->net_conf->ping_int * HZ +
1559                                         mdev->net_conf->ping_timeo*HZ/9);
1560                         drbd_resync_finished(mdev);
1561                 }
1562
1563                 atomic_set(&mdev->rs_sect_in, 0);
1564                 atomic_set(&mdev->rs_sect_ev, 0);
1565                 mdev->rs_in_flight = 0;
1566                 mdev->rs_planed = 0;
1567                 spin_lock(&mdev->peer_seq_lock);
1568                 fifo_set(&mdev->rs_plan_s, 0);
1569                 spin_unlock(&mdev->peer_seq_lock);
1570                 /* ns.conn may already be != mdev->state.conn,
1571                  * we may have been paused in between, or become paused until
1572                  * the timer triggers.
1573                  * No matter, that is handled in resync_timer_fn() */
1574                 if (ns.conn == C_SYNC_TARGET)
1575                         mod_timer(&mdev->resync_timer, jiffies);
1576
1577                 drbd_md_sync(mdev);
1578         }
1579         drbd_state_unlock(mdev);
1580 }
1581
1582 int drbd_worker(struct drbd_thread *thi)
1583 {
1584         struct drbd_conf *mdev = thi->mdev;
1585         struct drbd_work *w = NULL;
1586         LIST_HEAD(work_list);
1587         int intr = 0, i;
1588
1589         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1590
1591         while (get_t_state(thi) == Running) {
1592                 drbd_thread_current_set_cpu(mdev);
1593
1594                 if (down_trylock(&mdev->data.work.s)) {
1595                         mutex_lock(&mdev->data.mutex);
1596                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1597                                 drbd_tcp_uncork(mdev->data.socket);
1598                         mutex_unlock(&mdev->data.mutex);
1599
1600                         intr = down_interruptible(&mdev->data.work.s);
1601
1602                         mutex_lock(&mdev->data.mutex);
1603                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1604                                 drbd_tcp_cork(mdev->data.socket);
1605                         mutex_unlock(&mdev->data.mutex);
1606                 }
1607
1608                 if (intr) {
1609                         D_ASSERT(intr == -EINTR);
1610                         flush_signals(current);
1611                         ERR_IF (get_t_state(thi) == Running)
1612                                 continue;
1613                         break;
1614                 }
1615
1616                 if (get_t_state(thi) != Running)
1617                         break;
1618                 /* With this break, we have done a down() but not consumed
1619                    the entry from the list. The cleanup code takes care of
1620                    this...   */
1621
1622                 w = NULL;
1623                 spin_lock_irq(&mdev->data.work.q_lock);
1624                 ERR_IF(list_empty(&mdev->data.work.q)) {
1625                         /* something terribly wrong in our logic.
1626                          * we were able to down() the semaphore,
1627                          * but the list is empty... doh.
1628                          *
1629                          * what is the best thing to do now?
1630                          * try again from scratch, restarting the receiver,
1631                          * asender, whatnot? could break even more ugly,
1632                          * e.g. when we are primary, but no good local data.
1633                          *
1634                          * I'll try to get away just starting over this loop.
1635                          */
1636                         spin_unlock_irq(&mdev->data.work.q_lock);
1637                         continue;
1638                 }
1639                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1640                 list_del_init(&w->list);
1641                 spin_unlock_irq(&mdev->data.work.q_lock);
1642
1643                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1644                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1645                         if (mdev->state.conn >= C_CONNECTED)
1646                                 drbd_force_state(mdev,
1647                                                 NS(conn, C_NETWORK_FAILURE));
1648                 }
1649         }
1650         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1651         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1652
1653         spin_lock_irq(&mdev->data.work.q_lock);
1654         i = 0;
1655         while (!list_empty(&mdev->data.work.q)) {
1656                 list_splice_init(&mdev->data.work.q, &work_list);
1657                 spin_unlock_irq(&mdev->data.work.q_lock);
1658
1659                 while (!list_empty(&work_list)) {
1660                         w = list_entry(work_list.next, struct drbd_work, list);
1661                         list_del_init(&w->list);
1662                         w->cb(mdev, w, 1);
1663                         i++; /* dead debugging code */
1664                 }
1665
1666                 spin_lock_irq(&mdev->data.work.q_lock);
1667         }
1668         sema_init(&mdev->data.work.s, 0);
1669         /* DANGEROUS race: if someone did queue his work within the spinlock,
1670          * but up() ed outside the spinlock, we could get an up() on the
1671          * semaphore without corresponding list entry.
1672          * So don't do that.
1673          */
1674         spin_unlock_irq(&mdev->data.work.q_lock);
1675
1676         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1677         /* _drbd_set_state only uses stop_nowait.
1678          * wait here for the Exiting receiver. */
1679         drbd_thread_stop(&mdev->receiver);
1680         drbd_mdev_cleanup(mdev);
1681
1682         dev_info(DEV, "worker terminated\n");
1683
1684         clear_bit(DEVICE_DYING, &mdev->flags);
1685         clear_bit(CONFIG_PENDING, &mdev->flags);
1686         wake_up(&mdev->state_wait);
1687
1688         return 0;
1689 }