]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_worker.c
drbd: moved data and meta from mdev to tconn
[karo-tx-linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_conf *mdev,
43                                  struct drbd_work *w, int cancel);
44
45
46
47 /* endio handlers:
48  *   drbd_md_io_complete (defined here)
49  *   drbd_endio_pri (defined here)
50  *   drbd_endio_sec (defined here)
51  *   bm_async_io_complete (defined in drbd_bitmap.c)
52  *
53  * For all these callbacks, note the following:
54  * The callbacks will be called in irq context by the IDE drivers,
55  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
56  * Try to get the locking right :)
57  *
58  */
59
60
61 /* About the global_state_lock
62    Each state transition on an device holds a read lock. In case we have
63    to evaluate the sync after dependencies, we grab a write lock, because
64    we need stable states on all devices for that.  */
65 rwlock_t global_state_lock;
66
67 /* used for synchronous meta data and bitmap IO
68  * submitted by drbd_md_sync_page_io()
69  */
70 void drbd_md_io_complete(struct bio *bio, int error)
71 {
72         struct drbd_md_io *md_io;
73
74         md_io = (struct drbd_md_io *)bio->bi_private;
75         md_io->error = error;
76
77         complete(&md_io->event);
78 }
79
80 /* reads on behalf of the partner,
81  * "submitted" by the receiver
82  */
83 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
84 {
85         unsigned long flags = 0;
86         struct drbd_conf *mdev = e->mdev;
87
88         spin_lock_irqsave(&mdev->req_lock, flags);
89         mdev->read_cnt += e->i.size >> 9;
90         list_del(&e->w.list);
91         if (list_empty(&mdev->read_ee))
92                 wake_up(&mdev->ee_wait);
93         if (test_bit(__EE_WAS_ERROR, &e->flags))
94                 __drbd_chk_io_error(mdev, false);
95         spin_unlock_irqrestore(&mdev->req_lock, flags);
96
97         drbd_queue_work(&mdev->tconn->data.work, &e->w);
98         put_ldev(mdev);
99 }
100
101 /* writes on behalf of the partner, or resync writes,
102  * "submitted" by the receiver, final stage.  */
103 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
104 {
105         unsigned long flags = 0;
106         struct drbd_conf *mdev = e->mdev;
107         sector_t e_sector;
108         int do_wake;
109         u64 block_id;
110         int do_al_complete_io;
111
112         /* after we moved e to done_ee,
113          * we may no longer access it,
114          * it may be freed/reused already!
115          * (as soon as we release the req_lock) */
116         e_sector = e->i.sector;
117         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
118         block_id = e->block_id;
119
120         spin_lock_irqsave(&mdev->req_lock, flags);
121         mdev->writ_cnt += e->i.size >> 9;
122         list_del(&e->w.list); /* has been on active_ee or sync_ee */
123         list_add_tail(&e->w.list, &mdev->done_ee);
124
125         /*
126          * Do not remove from the epoch_entries tree here: we did not send the
127          * Ack yet and did not wake possibly waiting conflicting requests.
128          * Removed from the tree from "drbd_process_done_ee" within the
129          * appropriate w.cb (e_end_block/e_end_resync_block) or from
130          * _drbd_clear_done_ee.
131          */
132
133         do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
134
135         if (test_bit(__EE_WAS_ERROR, &e->flags))
136                 __drbd_chk_io_error(mdev, false);
137         spin_unlock_irqrestore(&mdev->req_lock, flags);
138
139         if (block_id == ID_SYNCER)
140                 drbd_rs_complete_io(mdev, e_sector);
141
142         if (do_wake)
143                 wake_up(&mdev->ee_wait);
144
145         if (do_al_complete_io)
146                 drbd_al_complete_io(mdev, e_sector);
147
148         wake_asender(mdev);
149         put_ldev(mdev);
150 }
151
152 /* writes on behalf of the partner, or resync writes,
153  * "submitted" by the receiver.
154  */
155 void drbd_endio_sec(struct bio *bio, int error)
156 {
157         struct drbd_epoch_entry *e = bio->bi_private;
158         struct drbd_conf *mdev = e->mdev;
159         int uptodate = bio_flagged(bio, BIO_UPTODATE);
160         int is_write = bio_data_dir(bio) == WRITE;
161
162         if (error && __ratelimit(&drbd_ratelimit_state))
163                 dev_warn(DEV, "%s: error=%d s=%llus\n",
164                                 is_write ? "write" : "read", error,
165                                 (unsigned long long)e->i.sector);
166         if (!error && !uptodate) {
167                 if (__ratelimit(&drbd_ratelimit_state))
168                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
169                                         is_write ? "write" : "read",
170                                         (unsigned long long)e->i.sector);
171                 /* strange behavior of some lower level drivers...
172                  * fail the request by clearing the uptodate flag,
173                  * but do not return any error?! */
174                 error = -EIO;
175         }
176
177         if (error)
178                 set_bit(__EE_WAS_ERROR, &e->flags);
179
180         bio_put(bio); /* no need for the bio anymore */
181         if (atomic_dec_and_test(&e->pending_bios)) {
182                 if (is_write)
183                         drbd_endio_write_sec_final(e);
184                 else
185                         drbd_endio_read_sec_final(e);
186         }
187 }
188
189 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
190  */
191 void drbd_endio_pri(struct bio *bio, int error)
192 {
193         unsigned long flags;
194         struct drbd_request *req = bio->bi_private;
195         struct drbd_conf *mdev = req->mdev;
196         struct bio_and_error m;
197         enum drbd_req_event what;
198         int uptodate = bio_flagged(bio, BIO_UPTODATE);
199
200         if (!error && !uptodate) {
201                 dev_warn(DEV, "p %s: setting error to -EIO\n",
202                          bio_data_dir(bio) == WRITE ? "write" : "read");
203                 /* strange behavior of some lower level drivers...
204                  * fail the request by clearing the uptodate flag,
205                  * but do not return any error?! */
206                 error = -EIO;
207         }
208
209         /* to avoid recursion in __req_mod */
210         if (unlikely(error)) {
211                 what = (bio_data_dir(bio) == WRITE)
212                         ? WRITE_COMPLETED_WITH_ERROR
213                         : (bio_rw(bio) == READ)
214                           ? READ_COMPLETED_WITH_ERROR
215                           : READ_AHEAD_COMPLETED_WITH_ERROR;
216         } else
217                 what = COMPLETED_OK;
218
219         bio_put(req->private_bio);
220         req->private_bio = ERR_PTR(error);
221
222         /* not req_mod(), we need irqsave here! */
223         spin_lock_irqsave(&mdev->req_lock, flags);
224         __req_mod(req, what, &m);
225         spin_unlock_irqrestore(&mdev->req_lock, flags);
226
227         if (m.bio)
228                 complete_master_bio(mdev, &m);
229 }
230
231 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
232 {
233         struct drbd_request *req = container_of(w, struct drbd_request, w);
234
235         /* We should not detach for read io-error,
236          * but try to WRITE the P_DATA_REPLY to the failed location,
237          * to give the disk the chance to relocate that block */
238
239         spin_lock_irq(&mdev->req_lock);
240         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
241                 _req_mod(req, READ_RETRY_REMOTE_CANCELED);
242                 spin_unlock_irq(&mdev->req_lock);
243                 return 1;
244         }
245         spin_unlock_irq(&mdev->req_lock);
246
247         return w_send_read_req(mdev, w, 0);
248 }
249
250 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
251 {
252         struct hash_desc desc;
253         struct scatterlist sg;
254         struct page *page = e->pages;
255         struct page *tmp;
256         unsigned len;
257
258         desc.tfm = tfm;
259         desc.flags = 0;
260
261         sg_init_table(&sg, 1);
262         crypto_hash_init(&desc);
263
264         while ((tmp = page_chain_next(page))) {
265                 /* all but the last page will be fully used */
266                 sg_set_page(&sg, page, PAGE_SIZE, 0);
267                 crypto_hash_update(&desc, &sg, sg.length);
268                 page = tmp;
269         }
270         /* and now the last, possibly only partially used page */
271         len = e->i.size & (PAGE_SIZE - 1);
272         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
273         crypto_hash_update(&desc, &sg, sg.length);
274         crypto_hash_final(&desc, digest);
275 }
276
277 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
278 {
279         struct hash_desc desc;
280         struct scatterlist sg;
281         struct bio_vec *bvec;
282         int i;
283
284         desc.tfm = tfm;
285         desc.flags = 0;
286
287         sg_init_table(&sg, 1);
288         crypto_hash_init(&desc);
289
290         __bio_for_each_segment(bvec, bio, i, 0) {
291                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
292                 crypto_hash_update(&desc, &sg, sg.length);
293         }
294         crypto_hash_final(&desc, digest);
295 }
296
297 /* TODO merge common code with w_e_end_ov_req */
298 int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
299 {
300         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
301         int digest_size;
302         void *digest;
303         int ok = 1;
304
305         if (unlikely(cancel))
306                 goto out;
307
308         if (likely((e->flags & EE_WAS_ERROR) != 0))
309                 goto out;
310
311         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
312         digest = kmalloc(digest_size, GFP_NOIO);
313         if (digest) {
314                 sector_t sector = e->i.sector;
315                 unsigned int size = e->i.size;
316                 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
317                 /* Free e and pages before send.
318                  * In case we block on congestion, we could otherwise run into
319                  * some distributed deadlock, if the other side blocks on
320                  * congestion as well, because our receiver blocks in
321                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
322                 drbd_free_ee(mdev, e);
323                 e = NULL;
324                 inc_rs_pending(mdev);
325                 ok = drbd_send_drequest_csum(mdev, sector, size,
326                                              digest, digest_size,
327                                              P_CSUM_RS_REQUEST);
328                 kfree(digest);
329         } else {
330                 dev_err(DEV, "kmalloc() of digest failed.\n");
331                 ok = 0;
332         }
333
334 out:
335         if (e)
336                 drbd_free_ee(mdev, e);
337
338         if (unlikely(!ok))
339                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
340         return ok;
341 }
342
343 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
344
345 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
346 {
347         struct drbd_epoch_entry *e;
348
349         if (!get_ldev(mdev))
350                 return -EIO;
351
352         if (drbd_rs_should_slow_down(mdev, sector))
353                 goto defer;
354
355         /* GFP_TRY, because if there is no memory available right now, this may
356          * be rescheduled for later. It is "only" background resync, after all. */
357         e = drbd_alloc_ee(mdev, ID_SYNCER /* unused */, sector, size, GFP_TRY);
358         if (!e)
359                 goto defer;
360
361         e->w.cb = w_e_send_csum;
362         spin_lock_irq(&mdev->req_lock);
363         list_add(&e->w.list, &mdev->read_ee);
364         spin_unlock_irq(&mdev->req_lock);
365
366         atomic_add(size >> 9, &mdev->rs_sect_ev);
367         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
368                 return 0;
369
370         /* If it failed because of ENOMEM, retry should help.  If it failed
371          * because bio_add_page failed (probably broken lower level driver),
372          * retry may or may not help.
373          * If it does not, you may need to force disconnect. */
374         spin_lock_irq(&mdev->req_lock);
375         list_del(&e->w.list);
376         spin_unlock_irq(&mdev->req_lock);
377
378         drbd_free_ee(mdev, e);
379 defer:
380         put_ldev(mdev);
381         return -EAGAIN;
382 }
383
384 int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
385 {
386         switch (mdev->state.conn) {
387         case C_VERIFY_S:
388                 w_make_ov_request(mdev, w, cancel);
389                 break;
390         case C_SYNC_TARGET:
391                 w_make_resync_request(mdev, w, cancel);
392                 break;
393         }
394
395         return 1;
396 }
397
398 void resync_timer_fn(unsigned long data)
399 {
400         struct drbd_conf *mdev = (struct drbd_conf *) data;
401
402         if (list_empty(&mdev->resync_work.list))
403                 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
404 }
405
406 static void fifo_set(struct fifo_buffer *fb, int value)
407 {
408         int i;
409
410         for (i = 0; i < fb->size; i++)
411                 fb->values[i] = value;
412 }
413
414 static int fifo_push(struct fifo_buffer *fb, int value)
415 {
416         int ov;
417
418         ov = fb->values[fb->head_index];
419         fb->values[fb->head_index++] = value;
420
421         if (fb->head_index >= fb->size)
422                 fb->head_index = 0;
423
424         return ov;
425 }
426
427 static void fifo_add_val(struct fifo_buffer *fb, int value)
428 {
429         int i;
430
431         for (i = 0; i < fb->size; i++)
432                 fb->values[i] += value;
433 }
434
435 static int drbd_rs_controller(struct drbd_conf *mdev)
436 {
437         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
438         unsigned int want;     /* The number of sectors we want in the proxy */
439         int req_sect; /* Number of sectors to request in this turn */
440         int correction; /* Number of sectors more we need in the proxy*/
441         int cps; /* correction per invocation of drbd_rs_controller() */
442         int steps; /* Number of time steps to plan ahead */
443         int curr_corr;
444         int max_sect;
445
446         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
447         mdev->rs_in_flight -= sect_in;
448
449         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
450
451         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
452
453         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
454                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
455         } else { /* normal path */
456                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
457                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
458         }
459
460         correction = want - mdev->rs_in_flight - mdev->rs_planed;
461
462         /* Plan ahead */
463         cps = correction / steps;
464         fifo_add_val(&mdev->rs_plan_s, cps);
465         mdev->rs_planed += cps * steps;
466
467         /* What we do in this step */
468         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
469         spin_unlock(&mdev->peer_seq_lock);
470         mdev->rs_planed -= curr_corr;
471
472         req_sect = sect_in + curr_corr;
473         if (req_sect < 0)
474                 req_sect = 0;
475
476         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
477         if (req_sect > max_sect)
478                 req_sect = max_sect;
479
480         /*
481         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
482                  sect_in, mdev->rs_in_flight, want, correction,
483                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
484         */
485
486         return req_sect;
487 }
488
489 static int drbd_rs_number_requests(struct drbd_conf *mdev)
490 {
491         int number;
492         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
493                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
494                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
495         } else {
496                 mdev->c_sync_rate = mdev->sync_conf.rate;
497                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
498         }
499
500         /* ignore the amount of pending requests, the resync controller should
501          * throttle down to incoming reply rate soon enough anyways. */
502         return number;
503 }
504
505 static int w_make_resync_request(struct drbd_conf *mdev,
506                                  struct drbd_work *w, int cancel)
507 {
508         unsigned long bit;
509         sector_t sector;
510         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
511         int max_bio_size;
512         int number, rollback_i, size;
513         int align, queued, sndbuf;
514         int i = 0;
515
516         if (unlikely(cancel))
517                 return 1;
518
519         if (mdev->rs_total == 0) {
520                 /* empty resync? */
521                 drbd_resync_finished(mdev);
522                 return 1;
523         }
524
525         if (!get_ldev(mdev)) {
526                 /* Since we only need to access mdev->rsync a
527                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
528                    to continue resync with a broken disk makes no sense at
529                    all */
530                 dev_err(DEV, "Disk broke down during resync!\n");
531                 return 1;
532         }
533
534         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
535         number = drbd_rs_number_requests(mdev);
536         if (number == 0)
537                 goto requeue;
538
539         for (i = 0; i < number; i++) {
540                 /* Stop generating RS requests, when half of the send buffer is filled */
541                 mutex_lock(&mdev->tconn->data.mutex);
542                 if (mdev->tconn->data.socket) {
543                         queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
544                         sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
545                 } else {
546                         queued = 1;
547                         sndbuf = 0;
548                 }
549                 mutex_unlock(&mdev->tconn->data.mutex);
550                 if (queued > sndbuf / 2)
551                         goto requeue;
552
553 next_sector:
554                 size = BM_BLOCK_SIZE;
555                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
556
557                 if (bit == DRBD_END_OF_BITMAP) {
558                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
559                         put_ldev(mdev);
560                         return 1;
561                 }
562
563                 sector = BM_BIT_TO_SECT(bit);
564
565                 if (drbd_rs_should_slow_down(mdev, sector) ||
566                     drbd_try_rs_begin_io(mdev, sector)) {
567                         mdev->bm_resync_fo = bit;
568                         goto requeue;
569                 }
570                 mdev->bm_resync_fo = bit + 1;
571
572                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
573                         drbd_rs_complete_io(mdev, sector);
574                         goto next_sector;
575                 }
576
577 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
578                 /* try to find some adjacent bits.
579                  * we stop if we have already the maximum req size.
580                  *
581                  * Additionally always align bigger requests, in order to
582                  * be prepared for all stripe sizes of software RAIDs.
583                  */
584                 align = 1;
585                 rollback_i = i;
586                 for (;;) {
587                         if (size + BM_BLOCK_SIZE > max_bio_size)
588                                 break;
589
590                         /* Be always aligned */
591                         if (sector & ((1<<(align+3))-1))
592                                 break;
593
594                         /* do not cross extent boundaries */
595                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
596                                 break;
597                         /* now, is it actually dirty, after all?
598                          * caution, drbd_bm_test_bit is tri-state for some
599                          * obscure reason; ( b == 0 ) would get the out-of-band
600                          * only accidentally right because of the "oddly sized"
601                          * adjustment below */
602                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
603                                 break;
604                         bit++;
605                         size += BM_BLOCK_SIZE;
606                         if ((BM_BLOCK_SIZE << align) <= size)
607                                 align++;
608                         i++;
609                 }
610                 /* if we merged some,
611                  * reset the offset to start the next drbd_bm_find_next from */
612                 if (size > BM_BLOCK_SIZE)
613                         mdev->bm_resync_fo = bit + 1;
614 #endif
615
616                 /* adjust very last sectors, in case we are oddly sized */
617                 if (sector + (size>>9) > capacity)
618                         size = (capacity-sector)<<9;
619                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
620                         switch (read_for_csum(mdev, sector, size)) {
621                         case -EIO: /* Disk failure */
622                                 put_ldev(mdev);
623                                 return 0;
624                         case -EAGAIN: /* allocation failed, or ldev busy */
625                                 drbd_rs_complete_io(mdev, sector);
626                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
627                                 i = rollback_i;
628                                 goto requeue;
629                         case 0:
630                                 /* everything ok */
631                                 break;
632                         default:
633                                 BUG();
634                         }
635                 } else {
636                         inc_rs_pending(mdev);
637                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
638                                                sector, size, ID_SYNCER)) {
639                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
640                                 dec_rs_pending(mdev);
641                                 put_ldev(mdev);
642                                 return 0;
643                         }
644                 }
645         }
646
647         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
648                 /* last syncer _request_ was sent,
649                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
650                  * next sync group will resume), as soon as we receive the last
651                  * resync data block, and the last bit is cleared.
652                  * until then resync "work" is "inactive" ...
653                  */
654                 put_ldev(mdev);
655                 return 1;
656         }
657
658  requeue:
659         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
660         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
661         put_ldev(mdev);
662         return 1;
663 }
664
665 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
666 {
667         int number, i, size;
668         sector_t sector;
669         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
670
671         if (unlikely(cancel))
672                 return 1;
673
674         number = drbd_rs_number_requests(mdev);
675
676         sector = mdev->ov_position;
677         for (i = 0; i < number; i++) {
678                 if (sector >= capacity) {
679                         return 1;
680                 }
681
682                 size = BM_BLOCK_SIZE;
683
684                 if (drbd_rs_should_slow_down(mdev, sector) ||
685                     drbd_try_rs_begin_io(mdev, sector)) {
686                         mdev->ov_position = sector;
687                         goto requeue;
688                 }
689
690                 if (sector + (size>>9) > capacity)
691                         size = (capacity-sector)<<9;
692
693                 inc_rs_pending(mdev);
694                 if (!drbd_send_ov_request(mdev, sector, size)) {
695                         dec_rs_pending(mdev);
696                         return 0;
697                 }
698                 sector += BM_SECT_PER_BIT;
699         }
700         mdev->ov_position = sector;
701
702  requeue:
703         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
704         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
705         return 1;
706 }
707
708
709 void start_resync_timer_fn(unsigned long data)
710 {
711         struct drbd_conf *mdev = (struct drbd_conf *) data;
712
713         drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
714 }
715
716 int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
717 {
718         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
719                 dev_warn(DEV, "w_start_resync later...\n");
720                 mdev->start_resync_timer.expires = jiffies + HZ/10;
721                 add_timer(&mdev->start_resync_timer);
722                 return 1;
723         }
724
725         drbd_start_resync(mdev, C_SYNC_SOURCE);
726         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
727         return 1;
728 }
729
730 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
731 {
732         kfree(w);
733         ov_oos_print(mdev);
734         drbd_resync_finished(mdev);
735
736         return 1;
737 }
738
739 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
740 {
741         kfree(w);
742
743         drbd_resync_finished(mdev);
744
745         return 1;
746 }
747
748 static void ping_peer(struct drbd_conf *mdev)
749 {
750         clear_bit(GOT_PING_ACK, &mdev->flags);
751         request_ping(mdev);
752         wait_event(mdev->misc_wait,
753                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
754 }
755
756 int drbd_resync_finished(struct drbd_conf *mdev)
757 {
758         unsigned long db, dt, dbdt;
759         unsigned long n_oos;
760         union drbd_state os, ns;
761         struct drbd_work *w;
762         char *khelper_cmd = NULL;
763         int verify_done = 0;
764
765         /* Remove all elements from the resync LRU. Since future actions
766          * might set bits in the (main) bitmap, then the entries in the
767          * resync LRU would be wrong. */
768         if (drbd_rs_del_all(mdev)) {
769                 /* In case this is not possible now, most probably because
770                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
771                  * queue (or even the read operations for those packets
772                  * is not finished by now).   Retry in 100ms. */
773
774                 schedule_timeout_interruptible(HZ / 10);
775                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
776                 if (w) {
777                         w->cb = w_resync_finished;
778                         drbd_queue_work(&mdev->tconn->data.work, w);
779                         return 1;
780                 }
781                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
782         }
783
784         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
785         if (dt <= 0)
786                 dt = 1;
787         db = mdev->rs_total;
788         dbdt = Bit2KB(db/dt);
789         mdev->rs_paused /= HZ;
790
791         if (!get_ldev(mdev))
792                 goto out;
793
794         ping_peer(mdev);
795
796         spin_lock_irq(&mdev->req_lock);
797         os = mdev->state;
798
799         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
800
801         /* This protects us against multiple calls (that can happen in the presence
802            of application IO), and against connectivity loss just before we arrive here. */
803         if (os.conn <= C_CONNECTED)
804                 goto out_unlock;
805
806         ns = os;
807         ns.conn = C_CONNECTED;
808
809         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
810              verify_done ? "Online verify " : "Resync",
811              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
812
813         n_oos = drbd_bm_total_weight(mdev);
814
815         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
816                 if (n_oos) {
817                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
818                               n_oos, Bit2KB(1));
819                         khelper_cmd = "out-of-sync";
820                 }
821         } else {
822                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
823
824                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
825                         khelper_cmd = "after-resync-target";
826
827                 if (mdev->csums_tfm && mdev->rs_total) {
828                         const unsigned long s = mdev->rs_same_csum;
829                         const unsigned long t = mdev->rs_total;
830                         const int ratio =
831                                 (t == 0)     ? 0 :
832                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
833                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
834                              "transferred %luK total %luK\n",
835                              ratio,
836                              Bit2KB(mdev->rs_same_csum),
837                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
838                              Bit2KB(mdev->rs_total));
839                 }
840         }
841
842         if (mdev->rs_failed) {
843                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
844
845                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
846                         ns.disk = D_INCONSISTENT;
847                         ns.pdsk = D_UP_TO_DATE;
848                 } else {
849                         ns.disk = D_UP_TO_DATE;
850                         ns.pdsk = D_INCONSISTENT;
851                 }
852         } else {
853                 ns.disk = D_UP_TO_DATE;
854                 ns.pdsk = D_UP_TO_DATE;
855
856                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
857                         if (mdev->p_uuid) {
858                                 int i;
859                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
860                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
861                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
862                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
863                         } else {
864                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
865                         }
866                 }
867
868                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
869                         /* for verify runs, we don't update uuids here,
870                          * so there would be nothing to report. */
871                         drbd_uuid_set_bm(mdev, 0UL);
872                         drbd_print_uuids(mdev, "updated UUIDs");
873                         if (mdev->p_uuid) {
874                                 /* Now the two UUID sets are equal, update what we
875                                  * know of the peer. */
876                                 int i;
877                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
878                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
879                         }
880                 }
881         }
882
883         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
884 out_unlock:
885         spin_unlock_irq(&mdev->req_lock);
886         put_ldev(mdev);
887 out:
888         mdev->rs_total  = 0;
889         mdev->rs_failed = 0;
890         mdev->rs_paused = 0;
891         if (verify_done)
892                 mdev->ov_start_sector = 0;
893
894         drbd_md_sync(mdev);
895
896         if (khelper_cmd)
897                 drbd_khelper(mdev, khelper_cmd);
898
899         return 1;
900 }
901
902 /* helper */
903 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
904 {
905         if (drbd_ee_has_active_page(e)) {
906                 /* This might happen if sendpage() has not finished */
907                 int i = (e->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
908                 atomic_add(i, &mdev->pp_in_use_by_net);
909                 atomic_sub(i, &mdev->pp_in_use);
910                 spin_lock_irq(&mdev->req_lock);
911                 list_add_tail(&e->w.list, &mdev->net_ee);
912                 spin_unlock_irq(&mdev->req_lock);
913                 wake_up(&drbd_pp_wait);
914         } else
915                 drbd_free_ee(mdev, e);
916 }
917
918 /**
919  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
920  * @mdev:       DRBD device.
921  * @w:          work object.
922  * @cancel:     The connection will be closed anyways
923  */
924 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
925 {
926         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
927         int ok;
928
929         if (unlikely(cancel)) {
930                 drbd_free_ee(mdev, e);
931                 dec_unacked(mdev);
932                 return 1;
933         }
934
935         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
936                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
937         } else {
938                 if (__ratelimit(&drbd_ratelimit_state))
939                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
940                             (unsigned long long)e->i.sector);
941
942                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
943         }
944
945         dec_unacked(mdev);
946
947         move_to_net_ee_or_free(mdev, e);
948
949         if (unlikely(!ok))
950                 dev_err(DEV, "drbd_send_block() failed\n");
951         return ok;
952 }
953
954 /**
955  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
956  * @mdev:       DRBD device.
957  * @w:          work object.
958  * @cancel:     The connection will be closed anyways
959  */
960 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
961 {
962         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
963         int ok;
964
965         if (unlikely(cancel)) {
966                 drbd_free_ee(mdev, e);
967                 dec_unacked(mdev);
968                 return 1;
969         }
970
971         if (get_ldev_if_state(mdev, D_FAILED)) {
972                 drbd_rs_complete_io(mdev, e->i.sector);
973                 put_ldev(mdev);
974         }
975
976         if (mdev->state.conn == C_AHEAD) {
977                 ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
978         } else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
979                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
980                         inc_rs_pending(mdev);
981                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
982                 } else {
983                         if (__ratelimit(&drbd_ratelimit_state))
984                                 dev_err(DEV, "Not sending RSDataReply, "
985                                     "partner DISKLESS!\n");
986                         ok = 1;
987                 }
988         } else {
989                 if (__ratelimit(&drbd_ratelimit_state))
990                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
991                             (unsigned long long)e->i.sector);
992
993                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
994
995                 /* update resync data with failure */
996                 drbd_rs_failed_io(mdev, e->i.sector, e->i.size);
997         }
998
999         dec_unacked(mdev);
1000
1001         move_to_net_ee_or_free(mdev, e);
1002
1003         if (unlikely(!ok))
1004                 dev_err(DEV, "drbd_send_block() failed\n");
1005         return ok;
1006 }
1007
1008 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1009 {
1010         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1011         struct digest_info *di;
1012         int digest_size;
1013         void *digest = NULL;
1014         int ok, eq = 0;
1015
1016         if (unlikely(cancel)) {
1017                 drbd_free_ee(mdev, e);
1018                 dec_unacked(mdev);
1019                 return 1;
1020         }
1021
1022         if (get_ldev(mdev)) {
1023                 drbd_rs_complete_io(mdev, e->i.sector);
1024                 put_ldev(mdev);
1025         }
1026
1027         di = e->digest;
1028
1029         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1030                 /* quick hack to try to avoid a race against reconfiguration.
1031                  * a real fix would be much more involved,
1032                  * introducing more locking mechanisms */
1033                 if (mdev->csums_tfm) {
1034                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1035                         D_ASSERT(digest_size == di->digest_size);
1036                         digest = kmalloc(digest_size, GFP_NOIO);
1037                 }
1038                 if (digest) {
1039                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1040                         eq = !memcmp(digest, di->digest, digest_size);
1041                         kfree(digest);
1042                 }
1043
1044                 if (eq) {
1045                         drbd_set_in_sync(mdev, e->i.sector, e->i.size);
1046                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1047                         mdev->rs_same_csum += e->i.size >> BM_BLOCK_SHIFT;
1048                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1049                 } else {
1050                         inc_rs_pending(mdev);
1051                         e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1052                         e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1053                         kfree(di);
1054                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1055                 }
1056         } else {
1057                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1058                 if (__ratelimit(&drbd_ratelimit_state))
1059                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1060         }
1061
1062         dec_unacked(mdev);
1063         move_to_net_ee_or_free(mdev, e);
1064
1065         if (unlikely(!ok))
1066                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1067         return ok;
1068 }
1069
1070 /* TODO merge common code with w_e_send_csum */
1071 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1072 {
1073         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1074         sector_t sector = e->i.sector;
1075         unsigned int size = e->i.size;
1076         int digest_size;
1077         void *digest;
1078         int ok = 1;
1079
1080         if (unlikely(cancel))
1081                 goto out;
1082
1083         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1084         digest = kmalloc(digest_size, GFP_NOIO);
1085         if (!digest) {
1086                 ok = 0; /* terminate the connection in case the allocation failed */
1087                 goto out;
1088         }
1089
1090         if (likely(!(e->flags & EE_WAS_ERROR)))
1091                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1092         else
1093                 memset(digest, 0, digest_size);
1094
1095         /* Free e and pages before send.
1096          * In case we block on congestion, we could otherwise run into
1097          * some distributed deadlock, if the other side blocks on
1098          * congestion as well, because our receiver blocks in
1099          * drbd_pp_alloc due to pp_in_use > max_buffers. */
1100         drbd_free_ee(mdev, e);
1101         e = NULL;
1102         inc_rs_pending(mdev);
1103         ok = drbd_send_drequest_csum(mdev, sector, size,
1104                                      digest, digest_size,
1105                                      P_OV_REPLY);
1106         if (!ok)
1107                 dec_rs_pending(mdev);
1108         kfree(digest);
1109
1110 out:
1111         if (e)
1112                 drbd_free_ee(mdev, e);
1113         dec_unacked(mdev);
1114         return ok;
1115 }
1116
1117 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1118 {
1119         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1120                 mdev->ov_last_oos_size += size>>9;
1121         } else {
1122                 mdev->ov_last_oos_start = sector;
1123                 mdev->ov_last_oos_size = size>>9;
1124         }
1125         drbd_set_out_of_sync(mdev, sector, size);
1126 }
1127
1128 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1129 {
1130         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1131         struct digest_info *di;
1132         void *digest;
1133         sector_t sector = e->i.sector;
1134         unsigned int size = e->i.size;
1135         int digest_size;
1136         int ok, eq = 0;
1137
1138         if (unlikely(cancel)) {
1139                 drbd_free_ee(mdev, e);
1140                 dec_unacked(mdev);
1141                 return 1;
1142         }
1143
1144         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1145          * the resync lru has been cleaned up already */
1146         if (get_ldev(mdev)) {
1147                 drbd_rs_complete_io(mdev, e->i.sector);
1148                 put_ldev(mdev);
1149         }
1150
1151         di = e->digest;
1152
1153         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1154                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1155                 digest = kmalloc(digest_size, GFP_NOIO);
1156                 if (digest) {
1157                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1158
1159                         D_ASSERT(digest_size == di->digest_size);
1160                         eq = !memcmp(digest, di->digest, digest_size);
1161                         kfree(digest);
1162                 }
1163         }
1164
1165                 /* Free e and pages before send.
1166                  * In case we block on congestion, we could otherwise run into
1167                  * some distributed deadlock, if the other side blocks on
1168                  * congestion as well, because our receiver blocks in
1169                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
1170         drbd_free_ee(mdev, e);
1171         if (!eq)
1172                 drbd_ov_oos_found(mdev, sector, size);
1173         else
1174                 ov_oos_print(mdev);
1175
1176         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1177                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1178
1179         dec_unacked(mdev);
1180
1181         --mdev->ov_left;
1182
1183         /* let's advance progress step marks only for every other megabyte */
1184         if ((mdev->ov_left & 0x200) == 0x200)
1185                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1186
1187         if (mdev->ov_left == 0) {
1188                 ov_oos_print(mdev);
1189                 drbd_resync_finished(mdev);
1190         }
1191
1192         return ok;
1193 }
1194
1195 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1196 {
1197         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1198         complete(&b->done);
1199         return 1;
1200 }
1201
1202 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1203 {
1204         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1205         struct p_barrier *p = &mdev->tconn->data.sbuf.barrier;
1206         int ok = 1;
1207
1208         /* really avoid racing with tl_clear.  w.cb may have been referenced
1209          * just before it was reassigned and re-queued, so double check that.
1210          * actually, this race was harmless, since we only try to send the
1211          * barrier packet here, and otherwise do nothing with the object.
1212          * but compare with the head of w_clear_epoch */
1213         spin_lock_irq(&mdev->req_lock);
1214         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1215                 cancel = 1;
1216         spin_unlock_irq(&mdev->req_lock);
1217         if (cancel)
1218                 return 1;
1219
1220         if (!drbd_get_data_sock(mdev))
1221                 return 0;
1222         p->barrier = b->br_number;
1223         /* inc_ap_pending was done where this was queued.
1224          * dec_ap_pending will be done in got_BarrierAck
1225          * or (on connection loss) in w_clear_epoch.  */
1226         ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BARRIER,
1227                                 (struct p_header80 *)p, sizeof(*p), 0);
1228         drbd_put_data_sock(mdev);
1229
1230         return ok;
1231 }
1232
1233 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1234 {
1235         if (cancel)
1236                 return 1;
1237         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1238 }
1239
1240 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1241 {
1242         struct drbd_request *req = container_of(w, struct drbd_request, w);
1243         int ok;
1244
1245         if (unlikely(cancel)) {
1246                 req_mod(req, SEND_CANCELED);
1247                 return 1;
1248         }
1249
1250         ok = drbd_send_oos(mdev, req);
1251         req_mod(req, OOS_HANDED_TO_NETWORK);
1252
1253         return ok;
1254 }
1255
1256 /**
1257  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1258  * @mdev:       DRBD device.
1259  * @w:          work object.
1260  * @cancel:     The connection will be closed anyways
1261  */
1262 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1263 {
1264         struct drbd_request *req = container_of(w, struct drbd_request, w);
1265         int ok;
1266
1267         if (unlikely(cancel)) {
1268                 req_mod(req, SEND_CANCELED);
1269                 return 1;
1270         }
1271
1272         ok = drbd_send_dblock(mdev, req);
1273         req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1274
1275         return ok;
1276 }
1277
1278 /**
1279  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1280  * @mdev:       DRBD device.
1281  * @w:          work object.
1282  * @cancel:     The connection will be closed anyways
1283  */
1284 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1285 {
1286         struct drbd_request *req = container_of(w, struct drbd_request, w);
1287         int ok;
1288
1289         if (unlikely(cancel)) {
1290                 req_mod(req, SEND_CANCELED);
1291                 return 1;
1292         }
1293
1294         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1295                                 (unsigned long)req);
1296
1297         if (!ok) {
1298                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1299                  * so this is probably redundant */
1300                 if (mdev->state.conn >= C_CONNECTED)
1301                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1302         }
1303         req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1304
1305         return ok;
1306 }
1307
1308 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1309 {
1310         struct drbd_request *req = container_of(w, struct drbd_request, w);
1311
1312         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1313                 drbd_al_begin_io(mdev, req->i.sector);
1314         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1315            theoretically. Practically it can not deadlock, since this is
1316            only used when unfreezing IOs. All the extents of the requests
1317            that made it into the TL are already active */
1318
1319         drbd_req_make_private_bio(req, req->master_bio);
1320         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1321         generic_make_request(req->private_bio);
1322
1323         return 1;
1324 }
1325
1326 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1327 {
1328         struct drbd_conf *odev = mdev;
1329
1330         while (1) {
1331                 if (odev->sync_conf.after == -1)
1332                         return 1;
1333                 odev = minor_to_mdev(odev->sync_conf.after);
1334                 if (!expect(odev))
1335                         return 1;
1336                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1337                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1338                     odev->state.aftr_isp || odev->state.peer_isp ||
1339                     odev->state.user_isp)
1340                         return 0;
1341         }
1342 }
1343
1344 /**
1345  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1346  * @mdev:       DRBD device.
1347  *
1348  * Called from process context only (admin command and after_state_ch).
1349  */
1350 static int _drbd_pause_after(struct drbd_conf *mdev)
1351 {
1352         struct drbd_conf *odev;
1353         int i, rv = 0;
1354
1355         for (i = 0; i < minor_count; i++) {
1356                 odev = minor_to_mdev(i);
1357                 if (!odev)
1358                         continue;
1359                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1360                         continue;
1361                 if (!_drbd_may_sync_now(odev))
1362                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1363                                != SS_NOTHING_TO_DO);
1364         }
1365
1366         return rv;
1367 }
1368
1369 /**
1370  * _drbd_resume_next() - Resume resync on all devices that may resync now
1371  * @mdev:       DRBD device.
1372  *
1373  * Called from process context only (admin command and worker).
1374  */
1375 static int _drbd_resume_next(struct drbd_conf *mdev)
1376 {
1377         struct drbd_conf *odev;
1378         int i, rv = 0;
1379
1380         for (i = 0; i < minor_count; i++) {
1381                 odev = minor_to_mdev(i);
1382                 if (!odev)
1383                         continue;
1384                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1385                         continue;
1386                 if (odev->state.aftr_isp) {
1387                         if (_drbd_may_sync_now(odev))
1388                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1389                                                         CS_HARD, NULL)
1390                                        != SS_NOTHING_TO_DO) ;
1391                 }
1392         }
1393         return rv;
1394 }
1395
1396 void resume_next_sg(struct drbd_conf *mdev)
1397 {
1398         write_lock_irq(&global_state_lock);
1399         _drbd_resume_next(mdev);
1400         write_unlock_irq(&global_state_lock);
1401 }
1402
1403 void suspend_other_sg(struct drbd_conf *mdev)
1404 {
1405         write_lock_irq(&global_state_lock);
1406         _drbd_pause_after(mdev);
1407         write_unlock_irq(&global_state_lock);
1408 }
1409
1410 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1411 {
1412         struct drbd_conf *odev;
1413
1414         if (o_minor == -1)
1415                 return NO_ERROR;
1416         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1417                 return ERR_SYNC_AFTER;
1418
1419         /* check for loops */
1420         odev = minor_to_mdev(o_minor);
1421         while (1) {
1422                 if (odev == mdev)
1423                         return ERR_SYNC_AFTER_CYCLE;
1424
1425                 /* dependency chain ends here, no cycles. */
1426                 if (odev->sync_conf.after == -1)
1427                         return NO_ERROR;
1428
1429                 /* follow the dependency chain */
1430                 odev = minor_to_mdev(odev->sync_conf.after);
1431         }
1432 }
1433
1434 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1435 {
1436         int changes;
1437         int retcode;
1438
1439         write_lock_irq(&global_state_lock);
1440         retcode = sync_after_error(mdev, na);
1441         if (retcode == NO_ERROR) {
1442                 mdev->sync_conf.after = na;
1443                 do {
1444                         changes  = _drbd_pause_after(mdev);
1445                         changes |= _drbd_resume_next(mdev);
1446                 } while (changes);
1447         }
1448         write_unlock_irq(&global_state_lock);
1449         return retcode;
1450 }
1451
1452 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1453 {
1454         atomic_set(&mdev->rs_sect_in, 0);
1455         atomic_set(&mdev->rs_sect_ev, 0);
1456         mdev->rs_in_flight = 0;
1457         mdev->rs_planed = 0;
1458         spin_lock(&mdev->peer_seq_lock);
1459         fifo_set(&mdev->rs_plan_s, 0);
1460         spin_unlock(&mdev->peer_seq_lock);
1461 }
1462
1463 /**
1464  * drbd_start_resync() - Start the resync process
1465  * @mdev:       DRBD device.
1466  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1467  *
1468  * This function might bring you directly into one of the
1469  * C_PAUSED_SYNC_* states.
1470  */
1471 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1472 {
1473         union drbd_state ns;
1474         int r;
1475
1476         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1477                 dev_err(DEV, "Resync already running!\n");
1478                 return;
1479         }
1480
1481         if (mdev->state.conn < C_AHEAD) {
1482                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1483                 drbd_rs_cancel_all(mdev);
1484                 /* This should be done when we abort the resync. We definitely do not
1485                    want to have this for connections going back and forth between
1486                    Ahead/Behind and SyncSource/SyncTarget */
1487         }
1488
1489         if (side == C_SYNC_TARGET) {
1490                 /* Since application IO was locked out during C_WF_BITMAP_T and
1491                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1492                    we check that we might make the data inconsistent. */
1493                 r = drbd_khelper(mdev, "before-resync-target");
1494                 r = (r >> 8) & 0xff;
1495                 if (r > 0) {
1496                         dev_info(DEV, "before-resync-target handler returned %d, "
1497                              "dropping connection.\n", r);
1498                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1499                         return;
1500                 }
1501         } else /* C_SYNC_SOURCE */ {
1502                 r = drbd_khelper(mdev, "before-resync-source");
1503                 r = (r >> 8) & 0xff;
1504                 if (r > 0) {
1505                         if (r == 3) {
1506                                 dev_info(DEV, "before-resync-source handler returned %d, "
1507                                          "ignoring. Old userland tools?", r);
1508                         } else {
1509                                 dev_info(DEV, "before-resync-source handler returned %d, "
1510                                          "dropping connection.\n", r);
1511                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1512                                 return;
1513                         }
1514                 }
1515         }
1516
1517         drbd_state_lock(mdev);
1518
1519         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1520                 drbd_state_unlock(mdev);
1521                 return;
1522         }
1523
1524         write_lock_irq(&global_state_lock);
1525         ns = mdev->state;
1526
1527         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1528
1529         ns.conn = side;
1530
1531         if (side == C_SYNC_TARGET)
1532                 ns.disk = D_INCONSISTENT;
1533         else /* side == C_SYNC_SOURCE */
1534                 ns.pdsk = D_INCONSISTENT;
1535
1536         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1537         ns = mdev->state;
1538
1539         if (ns.conn < C_CONNECTED)
1540                 r = SS_UNKNOWN_ERROR;
1541
1542         if (r == SS_SUCCESS) {
1543                 unsigned long tw = drbd_bm_total_weight(mdev);
1544                 unsigned long now = jiffies;
1545                 int i;
1546
1547                 mdev->rs_failed    = 0;
1548                 mdev->rs_paused    = 0;
1549                 mdev->rs_same_csum = 0;
1550                 mdev->rs_last_events = 0;
1551                 mdev->rs_last_sect_ev = 0;
1552                 mdev->rs_total     = tw;
1553                 mdev->rs_start     = now;
1554                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1555                         mdev->rs_mark_left[i] = tw;
1556                         mdev->rs_mark_time[i] = now;
1557                 }
1558                 _drbd_pause_after(mdev);
1559         }
1560         write_unlock_irq(&global_state_lock);
1561
1562         if (r == SS_SUCCESS) {
1563                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1564                      drbd_conn_str(ns.conn),
1565                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1566                      (unsigned long) mdev->rs_total);
1567                 if (side == C_SYNC_TARGET)
1568                         mdev->bm_resync_fo = 0;
1569
1570                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1571                  * with w_send_oos, or the sync target will get confused as to
1572                  * how much bits to resync.  We cannot do that always, because for an
1573                  * empty resync and protocol < 95, we need to do it here, as we call
1574                  * drbd_resync_finished from here in that case.
1575                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1576                  * and from after_state_ch otherwise. */
1577                 if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1578                         drbd_gen_and_send_sync_uuid(mdev);
1579
1580                 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1581                         /* This still has a race (about when exactly the peers
1582                          * detect connection loss) that can lead to a full sync
1583                          * on next handshake. In 8.3.9 we fixed this with explicit
1584                          * resync-finished notifications, but the fix
1585                          * introduces a protocol change.  Sleeping for some
1586                          * time longer than the ping interval + timeout on the
1587                          * SyncSource, to give the SyncTarget the chance to
1588                          * detect connection loss, then waiting for a ping
1589                          * response (implicit in drbd_resync_finished) reduces
1590                          * the race considerably, but does not solve it. */
1591                         if (side == C_SYNC_SOURCE)
1592                                 schedule_timeout_interruptible(
1593                                         mdev->tconn->net_conf->ping_int * HZ +
1594                                         mdev->tconn->net_conf->ping_timeo*HZ/9);
1595                         drbd_resync_finished(mdev);
1596                 }
1597
1598                 drbd_rs_controller_reset(mdev);
1599                 /* ns.conn may already be != mdev->state.conn,
1600                  * we may have been paused in between, or become paused until
1601                  * the timer triggers.
1602                  * No matter, that is handled in resync_timer_fn() */
1603                 if (ns.conn == C_SYNC_TARGET)
1604                         mod_timer(&mdev->resync_timer, jiffies);
1605
1606                 drbd_md_sync(mdev);
1607         }
1608         put_ldev(mdev);
1609         drbd_state_unlock(mdev);
1610 }
1611
1612 int drbd_worker(struct drbd_thread *thi)
1613 {
1614         struct drbd_conf *mdev = thi->mdev;
1615         struct drbd_work *w = NULL;
1616         LIST_HEAD(work_list);
1617         int intr = 0, i;
1618
1619         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1620
1621         while (get_t_state(thi) == RUNNING) {
1622                 drbd_thread_current_set_cpu(mdev);
1623
1624                 if (down_trylock(&mdev->tconn->data.work.s)) {
1625                         mutex_lock(&mdev->tconn->data.mutex);
1626                         if (mdev->tconn->data.socket && !mdev->tconn->net_conf->no_cork)
1627                                 drbd_tcp_uncork(mdev->tconn->data.socket);
1628                         mutex_unlock(&mdev->tconn->data.mutex);
1629
1630                         intr = down_interruptible(&mdev->tconn->data.work.s);
1631
1632                         mutex_lock(&mdev->tconn->data.mutex);
1633                         if (mdev->tconn->data.socket  && !mdev->tconn->net_conf->no_cork)
1634                                 drbd_tcp_cork(mdev->tconn->data.socket);
1635                         mutex_unlock(&mdev->tconn->data.mutex);
1636                 }
1637
1638                 if (intr) {
1639                         D_ASSERT(intr == -EINTR);
1640                         flush_signals(current);
1641                         if (!expect(get_t_state(thi) != RUNNING))
1642                                 continue;
1643                         break;
1644                 }
1645
1646                 if (get_t_state(thi) != RUNNING)
1647                         break;
1648                 /* With this break, we have done a down() but not consumed
1649                    the entry from the list. The cleanup code takes care of
1650                    this...   */
1651
1652                 w = NULL;
1653                 spin_lock_irq(&mdev->tconn->data.work.q_lock);
1654                 if (!expect(!list_empty(&mdev->tconn->data.work.q))) {
1655                         /* something terribly wrong in our logic.
1656                          * we were able to down() the semaphore,
1657                          * but the list is empty... doh.
1658                          *
1659                          * what is the best thing to do now?
1660                          * try again from scratch, restarting the receiver,
1661                          * asender, whatnot? could break even more ugly,
1662                          * e.g. when we are primary, but no good local data.
1663                          *
1664                          * I'll try to get away just starting over this loop.
1665                          */
1666                         spin_unlock_irq(&mdev->tconn->data.work.q_lock);
1667                         continue;
1668                 }
1669                 w = list_entry(mdev->tconn->data.work.q.next, struct drbd_work, list);
1670                 list_del_init(&w->list);
1671                 spin_unlock_irq(&mdev->tconn->data.work.q_lock);
1672
1673                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1674                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1675                         if (mdev->state.conn >= C_CONNECTED)
1676                                 drbd_force_state(mdev,
1677                                                 NS(conn, C_NETWORK_FAILURE));
1678                 }
1679         }
1680         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1681         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1682
1683         spin_lock_irq(&mdev->tconn->data.work.q_lock);
1684         i = 0;
1685         while (!list_empty(&mdev->tconn->data.work.q)) {
1686                 list_splice_init(&mdev->tconn->data.work.q, &work_list);
1687                 spin_unlock_irq(&mdev->tconn->data.work.q_lock);
1688
1689                 while (!list_empty(&work_list)) {
1690                         w = list_entry(work_list.next, struct drbd_work, list);
1691                         list_del_init(&w->list);
1692                         w->cb(mdev, w, 1);
1693                         i++; /* dead debugging code */
1694                 }
1695
1696                 spin_lock_irq(&mdev->tconn->data.work.q_lock);
1697         }
1698         sema_init(&mdev->tconn->data.work.s, 0);
1699         /* DANGEROUS race: if someone did queue his work within the spinlock,
1700          * but up() ed outside the spinlock, we could get an up() on the
1701          * semaphore without corresponding list entry.
1702          * So don't do that.
1703          */
1704         spin_unlock_irq(&mdev->tconn->data.work.q_lock);
1705
1706         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1707         /* _drbd_set_state only uses stop_nowait.
1708          * wait here for the EXITING receiver. */
1709         drbd_thread_stop(&mdev->receiver);
1710         drbd_mdev_cleanup(mdev);
1711
1712         dev_info(DEV, "worker terminated\n");
1713
1714         clear_bit(DEVICE_DYING, &mdev->flags);
1715         clear_bit(CONFIG_PENDING, &mdev->flags);
1716         wake_up(&mdev->state_wait);
1717
1718         return 0;
1719 }