]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_worker.c
drbd: Killed volume0; last step of multi-volume-enablement
[karo-tx-linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_work *w, int cancel);
43
44
45
46 /* endio handlers:
47  *   drbd_md_io_complete (defined here)
48  *   drbd_endio_pri (defined here)
49  *   drbd_endio_sec (defined here)
50  *   bm_async_io_complete (defined in drbd_bitmap.c)
51  *
52  * For all these callbacks, note the following:
53  * The callbacks will be called in irq context by the IDE drivers,
54  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
55  * Try to get the locking right :)
56  *
57  */
58
59
60 /* About the global_state_lock
61    Each state transition on an device holds a read lock. In case we have
62    to evaluate the sync after dependencies, we grab a write lock, because
63    we need stable states on all devices for that.  */
64 rwlock_t global_state_lock;
65
66 /* used for synchronous meta data and bitmap IO
67  * submitted by drbd_md_sync_page_io()
68  */
69 void drbd_md_io_complete(struct bio *bio, int error)
70 {
71         struct drbd_md_io *md_io;
72
73         md_io = (struct drbd_md_io *)bio->bi_private;
74         md_io->error = error;
75
76         complete(&md_io->event);
77 }
78
79 /* reads on behalf of the partner,
80  * "submitted" by the receiver
81  */
82 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
83 {
84         unsigned long flags = 0;
85         struct drbd_conf *mdev = peer_req->w.mdev;
86
87         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
88         mdev->read_cnt += peer_req->i.size >> 9;
89         list_del(&peer_req->w.list);
90         if (list_empty(&mdev->read_ee))
91                 wake_up(&mdev->ee_wait);
92         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
93                 __drbd_chk_io_error(mdev, false);
94         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
95
96         drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
97         put_ldev(mdev);
98 }
99
100 /* writes on behalf of the partner, or resync writes,
101  * "submitted" by the receiver, final stage.  */
102 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
103 {
104         unsigned long flags = 0;
105         struct drbd_conf *mdev = peer_req->w.mdev;
106         sector_t e_sector;
107         int do_wake;
108         u64 block_id;
109         int do_al_complete_io;
110
111         /* after we moved peer_req to done_ee,
112          * we may no longer access it,
113          * it may be freed/reused already!
114          * (as soon as we release the req_lock) */
115         e_sector = peer_req->i.sector;
116         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
117         block_id = peer_req->block_id;
118
119         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
120         mdev->writ_cnt += peer_req->i.size >> 9;
121         list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
122         list_add_tail(&peer_req->w.list, &mdev->done_ee);
123
124         /*
125          * Do not remove from the write_requests tree here: we did not send the
126          * Ack yet and did not wake possibly waiting conflicting requests.
127          * Removed from the tree from "drbd_process_done_ee" within the
128          * appropriate w.cb (e_end_block/e_end_resync_block) or from
129          * _drbd_clear_done_ee.
130          */
131
132         do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
133
134         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
135                 __drbd_chk_io_error(mdev, false);
136         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
137
138         if (block_id == ID_SYNCER)
139                 drbd_rs_complete_io(mdev, e_sector);
140
141         if (do_wake)
142                 wake_up(&mdev->ee_wait);
143
144         if (do_al_complete_io)
145                 drbd_al_complete_io(mdev, e_sector);
146
147         wake_asender(mdev->tconn);
148         put_ldev(mdev);
149 }
150
151 /* writes on behalf of the partner, or resync writes,
152  * "submitted" by the receiver.
153  */
154 void drbd_endio_sec(struct bio *bio, int error)
155 {
156         struct drbd_peer_request *peer_req = bio->bi_private;
157         struct drbd_conf *mdev = peer_req->w.mdev;
158         int uptodate = bio_flagged(bio, BIO_UPTODATE);
159         int is_write = bio_data_dir(bio) == WRITE;
160
161         if (error && __ratelimit(&drbd_ratelimit_state))
162                 dev_warn(DEV, "%s: error=%d s=%llus\n",
163                                 is_write ? "write" : "read", error,
164                                 (unsigned long long)peer_req->i.sector);
165         if (!error && !uptodate) {
166                 if (__ratelimit(&drbd_ratelimit_state))
167                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
168                                         is_write ? "write" : "read",
169                                         (unsigned long long)peer_req->i.sector);
170                 /* strange behavior of some lower level drivers...
171                  * fail the request by clearing the uptodate flag,
172                  * but do not return any error?! */
173                 error = -EIO;
174         }
175
176         if (error)
177                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
178
179         bio_put(bio); /* no need for the bio anymore */
180         if (atomic_dec_and_test(&peer_req->pending_bios)) {
181                 if (is_write)
182                         drbd_endio_write_sec_final(peer_req);
183                 else
184                         drbd_endio_read_sec_final(peer_req);
185         }
186 }
187
188 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
189  */
190 void drbd_endio_pri(struct bio *bio, int error)
191 {
192         unsigned long flags;
193         struct drbd_request *req = bio->bi_private;
194         struct drbd_conf *mdev = req->w.mdev;
195         struct bio_and_error m;
196         enum drbd_req_event what;
197         int uptodate = bio_flagged(bio, BIO_UPTODATE);
198
199         if (!error && !uptodate) {
200                 dev_warn(DEV, "p %s: setting error to -EIO\n",
201                          bio_data_dir(bio) == WRITE ? "write" : "read");
202                 /* strange behavior of some lower level drivers...
203                  * fail the request by clearing the uptodate flag,
204                  * but do not return any error?! */
205                 error = -EIO;
206         }
207
208         /* to avoid recursion in __req_mod */
209         if (unlikely(error)) {
210                 what = (bio_data_dir(bio) == WRITE)
211                         ? WRITE_COMPLETED_WITH_ERROR
212                         : (bio_rw(bio) == READ)
213                           ? READ_COMPLETED_WITH_ERROR
214                           : READ_AHEAD_COMPLETED_WITH_ERROR;
215         } else
216                 what = COMPLETED_OK;
217
218         bio_put(req->private_bio);
219         req->private_bio = ERR_PTR(error);
220
221         /* not req_mod(), we need irqsave here! */
222         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
223         __req_mod(req, what, &m);
224         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
225
226         if (m.bio)
227                 complete_master_bio(mdev, &m);
228 }
229
230 int w_read_retry_remote(struct drbd_work *w, int cancel)
231 {
232         struct drbd_request *req = container_of(w, struct drbd_request, w);
233         struct drbd_conf *mdev = w->mdev;
234
235         /* We should not detach for read io-error,
236          * but try to WRITE the P_DATA_REPLY to the failed location,
237          * to give the disk the chance to relocate that block */
238
239         spin_lock_irq(&mdev->tconn->req_lock);
240         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
241                 _req_mod(req, READ_RETRY_REMOTE_CANCELED);
242                 spin_unlock_irq(&mdev->tconn->req_lock);
243                 return 1;
244         }
245         spin_unlock_irq(&mdev->tconn->req_lock);
246
247         return w_send_read_req(w, 0);
248 }
249
250 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
251                   struct drbd_peer_request *peer_req, void *digest)
252 {
253         struct hash_desc desc;
254         struct scatterlist sg;
255         struct page *page = peer_req->pages;
256         struct page *tmp;
257         unsigned len;
258
259         desc.tfm = tfm;
260         desc.flags = 0;
261
262         sg_init_table(&sg, 1);
263         crypto_hash_init(&desc);
264
265         while ((tmp = page_chain_next(page))) {
266                 /* all but the last page will be fully used */
267                 sg_set_page(&sg, page, PAGE_SIZE, 0);
268                 crypto_hash_update(&desc, &sg, sg.length);
269                 page = tmp;
270         }
271         /* and now the last, possibly only partially used page */
272         len = peer_req->i.size & (PAGE_SIZE - 1);
273         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
274         crypto_hash_update(&desc, &sg, sg.length);
275         crypto_hash_final(&desc, digest);
276 }
277
278 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
279 {
280         struct hash_desc desc;
281         struct scatterlist sg;
282         struct bio_vec *bvec;
283         int i;
284
285         desc.tfm = tfm;
286         desc.flags = 0;
287
288         sg_init_table(&sg, 1);
289         crypto_hash_init(&desc);
290
291         __bio_for_each_segment(bvec, bio, i, 0) {
292                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
293                 crypto_hash_update(&desc, &sg, sg.length);
294         }
295         crypto_hash_final(&desc, digest);
296 }
297
298 static int w_e_send_csum(struct drbd_work *w, int cancel)
299 {
300         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
301         struct drbd_conf *mdev = w->mdev;
302         int digest_size;
303         void *digest;
304         int ok = 1;
305
306         if (unlikely(cancel))
307                 goto out;
308
309         if (likely((peer_req->flags & EE_WAS_ERROR) != 0))
310                 goto out;
311
312         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
313         digest = kmalloc(digest_size, GFP_NOIO);
314         if (digest) {
315                 sector_t sector = peer_req->i.sector;
316                 unsigned int size = peer_req->i.size;
317                 drbd_csum_ee(mdev, mdev->csums_tfm, peer_req, digest);
318                 /* Free e and pages before send.
319                  * In case we block on congestion, we could otherwise run into
320                  * some distributed deadlock, if the other side blocks on
321                  * congestion as well, because our receiver blocks in
322                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
323                 drbd_free_ee(mdev, peer_req);
324                 peer_req = NULL;
325                 inc_rs_pending(mdev);
326                 ok = drbd_send_drequest_csum(mdev, sector, size,
327                                              digest, digest_size,
328                                              P_CSUM_RS_REQUEST);
329                 kfree(digest);
330         } else {
331                 dev_err(DEV, "kmalloc() of digest failed.\n");
332                 ok = 0;
333         }
334
335 out:
336         if (peer_req)
337                 drbd_free_ee(mdev, peer_req);
338
339         if (unlikely(!ok))
340                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
341         return ok;
342 }
343
344 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
345
346 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
347 {
348         struct drbd_peer_request *peer_req;
349
350         if (!get_ldev(mdev))
351                 return -EIO;
352
353         if (drbd_rs_should_slow_down(mdev, sector))
354                 goto defer;
355
356         /* GFP_TRY, because if there is no memory available right now, this may
357          * be rescheduled for later. It is "only" background resync, after all. */
358         peer_req = drbd_alloc_ee(mdev, ID_SYNCER /* unused */, sector, size, GFP_TRY);
359         if (!peer_req)
360                 goto defer;
361
362         peer_req->w.cb = w_e_send_csum;
363         spin_lock_irq(&mdev->tconn->req_lock);
364         list_add(&peer_req->w.list, &mdev->read_ee);
365         spin_unlock_irq(&mdev->tconn->req_lock);
366
367         atomic_add(size >> 9, &mdev->rs_sect_ev);
368         if (drbd_submit_ee(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
369                 return 0;
370
371         /* If it failed because of ENOMEM, retry should help.  If it failed
372          * because bio_add_page failed (probably broken lower level driver),
373          * retry may or may not help.
374          * If it does not, you may need to force disconnect. */
375         spin_lock_irq(&mdev->tconn->req_lock);
376         list_del(&peer_req->w.list);
377         spin_unlock_irq(&mdev->tconn->req_lock);
378
379         drbd_free_ee(mdev, peer_req);
380 defer:
381         put_ldev(mdev);
382         return -EAGAIN;
383 }
384
385 int w_resync_timer(struct drbd_work *w, int cancel)
386 {
387         struct drbd_conf *mdev = w->mdev;
388         switch (mdev->state.conn) {
389         case C_VERIFY_S:
390                 w_make_ov_request(w, cancel);
391                 break;
392         case C_SYNC_TARGET:
393                 w_make_resync_request(w, cancel);
394                 break;
395         }
396
397         return 1;
398 }
399
400 void resync_timer_fn(unsigned long data)
401 {
402         struct drbd_conf *mdev = (struct drbd_conf *) data;
403
404         if (list_empty(&mdev->resync_work.list))
405                 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
406 }
407
408 static void fifo_set(struct fifo_buffer *fb, int value)
409 {
410         int i;
411
412         for (i = 0; i < fb->size; i++)
413                 fb->values[i] = value;
414 }
415
416 static int fifo_push(struct fifo_buffer *fb, int value)
417 {
418         int ov;
419
420         ov = fb->values[fb->head_index];
421         fb->values[fb->head_index++] = value;
422
423         if (fb->head_index >= fb->size)
424                 fb->head_index = 0;
425
426         return ov;
427 }
428
429 static void fifo_add_val(struct fifo_buffer *fb, int value)
430 {
431         int i;
432
433         for (i = 0; i < fb->size; i++)
434                 fb->values[i] += value;
435 }
436
437 static int drbd_rs_controller(struct drbd_conf *mdev)
438 {
439         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
440         unsigned int want;     /* The number of sectors we want in the proxy */
441         int req_sect; /* Number of sectors to request in this turn */
442         int correction; /* Number of sectors more we need in the proxy*/
443         int cps; /* correction per invocation of drbd_rs_controller() */
444         int steps; /* Number of time steps to plan ahead */
445         int curr_corr;
446         int max_sect;
447
448         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
449         mdev->rs_in_flight -= sect_in;
450
451         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
452
453         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
454
455         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
456                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
457         } else { /* normal path */
458                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
459                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
460         }
461
462         correction = want - mdev->rs_in_flight - mdev->rs_planed;
463
464         /* Plan ahead */
465         cps = correction / steps;
466         fifo_add_val(&mdev->rs_plan_s, cps);
467         mdev->rs_planed += cps * steps;
468
469         /* What we do in this step */
470         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
471         spin_unlock(&mdev->peer_seq_lock);
472         mdev->rs_planed -= curr_corr;
473
474         req_sect = sect_in + curr_corr;
475         if (req_sect < 0)
476                 req_sect = 0;
477
478         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
479         if (req_sect > max_sect)
480                 req_sect = max_sect;
481
482         /*
483         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
484                  sect_in, mdev->rs_in_flight, want, correction,
485                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
486         */
487
488         return req_sect;
489 }
490
491 static int drbd_rs_number_requests(struct drbd_conf *mdev)
492 {
493         int number;
494         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
495                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
496                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
497         } else {
498                 mdev->c_sync_rate = mdev->sync_conf.rate;
499                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
500         }
501
502         /* ignore the amount of pending requests, the resync controller should
503          * throttle down to incoming reply rate soon enough anyways. */
504         return number;
505 }
506
507 static int w_make_resync_request(struct drbd_work *w, int cancel)
508 {
509         struct drbd_conf *mdev = w->mdev;
510         unsigned long bit;
511         sector_t sector;
512         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
513         int max_bio_size;
514         int number, rollback_i, size;
515         int align, queued, sndbuf;
516         int i = 0;
517
518         if (unlikely(cancel))
519                 return 1;
520
521         if (mdev->rs_total == 0) {
522                 /* empty resync? */
523                 drbd_resync_finished(mdev);
524                 return 1;
525         }
526
527         if (!get_ldev(mdev)) {
528                 /* Since we only need to access mdev->rsync a
529                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
530                    to continue resync with a broken disk makes no sense at
531                    all */
532                 dev_err(DEV, "Disk broke down during resync!\n");
533                 return 1;
534         }
535
536         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
537         number = drbd_rs_number_requests(mdev);
538         if (number == 0)
539                 goto requeue;
540
541         for (i = 0; i < number; i++) {
542                 /* Stop generating RS requests, when half of the send buffer is filled */
543                 mutex_lock(&mdev->tconn->data.mutex);
544                 if (mdev->tconn->data.socket) {
545                         queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
546                         sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
547                 } else {
548                         queued = 1;
549                         sndbuf = 0;
550                 }
551                 mutex_unlock(&mdev->tconn->data.mutex);
552                 if (queued > sndbuf / 2)
553                         goto requeue;
554
555 next_sector:
556                 size = BM_BLOCK_SIZE;
557                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
558
559                 if (bit == DRBD_END_OF_BITMAP) {
560                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
561                         put_ldev(mdev);
562                         return 1;
563                 }
564
565                 sector = BM_BIT_TO_SECT(bit);
566
567                 if (drbd_rs_should_slow_down(mdev, sector) ||
568                     drbd_try_rs_begin_io(mdev, sector)) {
569                         mdev->bm_resync_fo = bit;
570                         goto requeue;
571                 }
572                 mdev->bm_resync_fo = bit + 1;
573
574                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
575                         drbd_rs_complete_io(mdev, sector);
576                         goto next_sector;
577                 }
578
579 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
580                 /* try to find some adjacent bits.
581                  * we stop if we have already the maximum req size.
582                  *
583                  * Additionally always align bigger requests, in order to
584                  * be prepared for all stripe sizes of software RAIDs.
585                  */
586                 align = 1;
587                 rollback_i = i;
588                 for (;;) {
589                         if (size + BM_BLOCK_SIZE > max_bio_size)
590                                 break;
591
592                         /* Be always aligned */
593                         if (sector & ((1<<(align+3))-1))
594                                 break;
595
596                         /* do not cross extent boundaries */
597                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
598                                 break;
599                         /* now, is it actually dirty, after all?
600                          * caution, drbd_bm_test_bit is tri-state for some
601                          * obscure reason; ( b == 0 ) would get the out-of-band
602                          * only accidentally right because of the "oddly sized"
603                          * adjustment below */
604                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
605                                 break;
606                         bit++;
607                         size += BM_BLOCK_SIZE;
608                         if ((BM_BLOCK_SIZE << align) <= size)
609                                 align++;
610                         i++;
611                 }
612                 /* if we merged some,
613                  * reset the offset to start the next drbd_bm_find_next from */
614                 if (size > BM_BLOCK_SIZE)
615                         mdev->bm_resync_fo = bit + 1;
616 #endif
617
618                 /* adjust very last sectors, in case we are oddly sized */
619                 if (sector + (size>>9) > capacity)
620                         size = (capacity-sector)<<9;
621                 if (mdev->tconn->agreed_pro_version >= 89 && mdev->csums_tfm) {
622                         switch (read_for_csum(mdev, sector, size)) {
623                         case -EIO: /* Disk failure */
624                                 put_ldev(mdev);
625                                 return 0;
626                         case -EAGAIN: /* allocation failed, or ldev busy */
627                                 drbd_rs_complete_io(mdev, sector);
628                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
629                                 i = rollback_i;
630                                 goto requeue;
631                         case 0:
632                                 /* everything ok */
633                                 break;
634                         default:
635                                 BUG();
636                         }
637                 } else {
638                         inc_rs_pending(mdev);
639                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
640                                                sector, size, ID_SYNCER)) {
641                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
642                                 dec_rs_pending(mdev);
643                                 put_ldev(mdev);
644                                 return 0;
645                         }
646                 }
647         }
648
649         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
650                 /* last syncer _request_ was sent,
651                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
652                  * next sync group will resume), as soon as we receive the last
653                  * resync data block, and the last bit is cleared.
654                  * until then resync "work" is "inactive" ...
655                  */
656                 put_ldev(mdev);
657                 return 1;
658         }
659
660  requeue:
661         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
662         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
663         put_ldev(mdev);
664         return 1;
665 }
666
667 static int w_make_ov_request(struct drbd_work *w, int cancel)
668 {
669         struct drbd_conf *mdev = w->mdev;
670         int number, i, size;
671         sector_t sector;
672         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
673
674         if (unlikely(cancel))
675                 return 1;
676
677         number = drbd_rs_number_requests(mdev);
678
679         sector = mdev->ov_position;
680         for (i = 0; i < number; i++) {
681                 if (sector >= capacity) {
682                         return 1;
683                 }
684
685                 size = BM_BLOCK_SIZE;
686
687                 if (drbd_rs_should_slow_down(mdev, sector) ||
688                     drbd_try_rs_begin_io(mdev, sector)) {
689                         mdev->ov_position = sector;
690                         goto requeue;
691                 }
692
693                 if (sector + (size>>9) > capacity)
694                         size = (capacity-sector)<<9;
695
696                 inc_rs_pending(mdev);
697                 if (!drbd_send_ov_request(mdev, sector, size)) {
698                         dec_rs_pending(mdev);
699                         return 0;
700                 }
701                 sector += BM_SECT_PER_BIT;
702         }
703         mdev->ov_position = sector;
704
705  requeue:
706         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
707         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
708         return 1;
709 }
710
711 int w_ov_finished(struct drbd_work *w, int cancel)
712 {
713         struct drbd_conf *mdev = w->mdev;
714         kfree(w);
715         ov_oos_print(mdev);
716         drbd_resync_finished(mdev);
717
718         return 1;
719 }
720
721 static int w_resync_finished(struct drbd_work *w, int cancel)
722 {
723         struct drbd_conf *mdev = w->mdev;
724         kfree(w);
725
726         drbd_resync_finished(mdev);
727
728         return 1;
729 }
730
731 static void ping_peer(struct drbd_conf *mdev)
732 {
733         struct drbd_tconn *tconn = mdev->tconn;
734
735         clear_bit(GOT_PING_ACK, &tconn->flags);
736         request_ping(tconn);
737         wait_event(tconn->ping_wait,
738                    test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
739 }
740
741 int drbd_resync_finished(struct drbd_conf *mdev)
742 {
743         unsigned long db, dt, dbdt;
744         unsigned long n_oos;
745         union drbd_state os, ns;
746         struct drbd_work *w;
747         char *khelper_cmd = NULL;
748         int verify_done = 0;
749
750         /* Remove all elements from the resync LRU. Since future actions
751          * might set bits in the (main) bitmap, then the entries in the
752          * resync LRU would be wrong. */
753         if (drbd_rs_del_all(mdev)) {
754                 /* In case this is not possible now, most probably because
755                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
756                  * queue (or even the read operations for those packets
757                  * is not finished by now).   Retry in 100ms. */
758
759                 schedule_timeout_interruptible(HZ / 10);
760                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
761                 if (w) {
762                         w->cb = w_resync_finished;
763                         drbd_queue_work(&mdev->tconn->data.work, w);
764                         return 1;
765                 }
766                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
767         }
768
769         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
770         if (dt <= 0)
771                 dt = 1;
772         db = mdev->rs_total;
773         dbdt = Bit2KB(db/dt);
774         mdev->rs_paused /= HZ;
775
776         if (!get_ldev(mdev))
777                 goto out;
778
779         ping_peer(mdev);
780
781         spin_lock_irq(&mdev->tconn->req_lock);
782         os = mdev->state;
783
784         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
785
786         /* This protects us against multiple calls (that can happen in the presence
787            of application IO), and against connectivity loss just before we arrive here. */
788         if (os.conn <= C_CONNECTED)
789                 goto out_unlock;
790
791         ns = os;
792         ns.conn = C_CONNECTED;
793
794         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
795              verify_done ? "Online verify " : "Resync",
796              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
797
798         n_oos = drbd_bm_total_weight(mdev);
799
800         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
801                 if (n_oos) {
802                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
803                               n_oos, Bit2KB(1));
804                         khelper_cmd = "out-of-sync";
805                 }
806         } else {
807                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
808
809                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
810                         khelper_cmd = "after-resync-target";
811
812                 if (mdev->csums_tfm && mdev->rs_total) {
813                         const unsigned long s = mdev->rs_same_csum;
814                         const unsigned long t = mdev->rs_total;
815                         const int ratio =
816                                 (t == 0)     ? 0 :
817                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
818                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
819                              "transferred %luK total %luK\n",
820                              ratio,
821                              Bit2KB(mdev->rs_same_csum),
822                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
823                              Bit2KB(mdev->rs_total));
824                 }
825         }
826
827         if (mdev->rs_failed) {
828                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
829
830                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
831                         ns.disk = D_INCONSISTENT;
832                         ns.pdsk = D_UP_TO_DATE;
833                 } else {
834                         ns.disk = D_UP_TO_DATE;
835                         ns.pdsk = D_INCONSISTENT;
836                 }
837         } else {
838                 ns.disk = D_UP_TO_DATE;
839                 ns.pdsk = D_UP_TO_DATE;
840
841                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
842                         if (mdev->p_uuid) {
843                                 int i;
844                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
845                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
846                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
847                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
848                         } else {
849                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
850                         }
851                 }
852
853                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
854                         /* for verify runs, we don't update uuids here,
855                          * so there would be nothing to report. */
856                         drbd_uuid_set_bm(mdev, 0UL);
857                         drbd_print_uuids(mdev, "updated UUIDs");
858                         if (mdev->p_uuid) {
859                                 /* Now the two UUID sets are equal, update what we
860                                  * know of the peer. */
861                                 int i;
862                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
863                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
864                         }
865                 }
866         }
867
868         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
869 out_unlock:
870         spin_unlock_irq(&mdev->tconn->req_lock);
871         put_ldev(mdev);
872 out:
873         mdev->rs_total  = 0;
874         mdev->rs_failed = 0;
875         mdev->rs_paused = 0;
876         if (verify_done)
877                 mdev->ov_start_sector = 0;
878
879         drbd_md_sync(mdev);
880
881         if (khelper_cmd)
882                 drbd_khelper(mdev, khelper_cmd);
883
884         return 1;
885 }
886
887 /* helper */
888 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
889 {
890         if (drbd_ee_has_active_page(peer_req)) {
891                 /* This might happen if sendpage() has not finished */
892                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
893                 atomic_add(i, &mdev->pp_in_use_by_net);
894                 atomic_sub(i, &mdev->pp_in_use);
895                 spin_lock_irq(&mdev->tconn->req_lock);
896                 list_add_tail(&peer_req->w.list, &mdev->net_ee);
897                 spin_unlock_irq(&mdev->tconn->req_lock);
898                 wake_up(&drbd_pp_wait);
899         } else
900                 drbd_free_ee(mdev, peer_req);
901 }
902
903 /**
904  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
905  * @mdev:       DRBD device.
906  * @w:          work object.
907  * @cancel:     The connection will be closed anyways
908  */
909 int w_e_end_data_req(struct drbd_work *w, int cancel)
910 {
911         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
912         struct drbd_conf *mdev = w->mdev;
913         int ok;
914
915         if (unlikely(cancel)) {
916                 drbd_free_ee(mdev, peer_req);
917                 dec_unacked(mdev);
918                 return 1;
919         }
920
921         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
922                 ok = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
923         } else {
924                 if (__ratelimit(&drbd_ratelimit_state))
925                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
926                             (unsigned long long)peer_req->i.sector);
927
928                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
929         }
930
931         dec_unacked(mdev);
932
933         move_to_net_ee_or_free(mdev, peer_req);
934
935         if (unlikely(!ok))
936                 dev_err(DEV, "drbd_send_block() failed\n");
937         return ok;
938 }
939
940 /**
941  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
942  * @mdev:       DRBD device.
943  * @w:          work object.
944  * @cancel:     The connection will be closed anyways
945  */
946 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
947 {
948         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
949         struct drbd_conf *mdev = w->mdev;
950         int ok;
951
952         if (unlikely(cancel)) {
953                 drbd_free_ee(mdev, peer_req);
954                 dec_unacked(mdev);
955                 return 1;
956         }
957
958         if (get_ldev_if_state(mdev, D_FAILED)) {
959                 drbd_rs_complete_io(mdev, peer_req->i.sector);
960                 put_ldev(mdev);
961         }
962
963         if (mdev->state.conn == C_AHEAD) {
964                 ok = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
965         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
966                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
967                         inc_rs_pending(mdev);
968                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
969                 } else {
970                         if (__ratelimit(&drbd_ratelimit_state))
971                                 dev_err(DEV, "Not sending RSDataReply, "
972                                     "partner DISKLESS!\n");
973                         ok = 1;
974                 }
975         } else {
976                 if (__ratelimit(&drbd_ratelimit_state))
977                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
978                             (unsigned long long)peer_req->i.sector);
979
980                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
981
982                 /* update resync data with failure */
983                 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
984         }
985
986         dec_unacked(mdev);
987
988         move_to_net_ee_or_free(mdev, peer_req);
989
990         if (unlikely(!ok))
991                 dev_err(DEV, "drbd_send_block() failed\n");
992         return ok;
993 }
994
995 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
996 {
997         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
998         struct drbd_conf *mdev = w->mdev;
999         struct digest_info *di;
1000         int digest_size;
1001         void *digest = NULL;
1002         int ok, eq = 0;
1003
1004         if (unlikely(cancel)) {
1005                 drbd_free_ee(mdev, peer_req);
1006                 dec_unacked(mdev);
1007                 return 1;
1008         }
1009
1010         if (get_ldev(mdev)) {
1011                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1012                 put_ldev(mdev);
1013         }
1014
1015         di = peer_req->digest;
1016
1017         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1018                 /* quick hack to try to avoid a race against reconfiguration.
1019                  * a real fix would be much more involved,
1020                  * introducing more locking mechanisms */
1021                 if (mdev->csums_tfm) {
1022                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1023                         D_ASSERT(digest_size == di->digest_size);
1024                         digest = kmalloc(digest_size, GFP_NOIO);
1025                 }
1026                 if (digest) {
1027                         drbd_csum_ee(mdev, mdev->csums_tfm, peer_req, digest);
1028                         eq = !memcmp(digest, di->digest, digest_size);
1029                         kfree(digest);
1030                 }
1031
1032                 if (eq) {
1033                         drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1034                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1035                         mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1036                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1037                 } else {
1038                         inc_rs_pending(mdev);
1039                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1040                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1041                         kfree(di);
1042                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1043                 }
1044         } else {
1045                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1046                 if (__ratelimit(&drbd_ratelimit_state))
1047                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1048         }
1049
1050         dec_unacked(mdev);
1051         move_to_net_ee_or_free(mdev, peer_req);
1052
1053         if (unlikely(!ok))
1054                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1055         return ok;
1056 }
1057
1058 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1059 {
1060         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1061         struct drbd_conf *mdev = w->mdev;
1062         sector_t sector = peer_req->i.sector;
1063         unsigned int size = peer_req->i.size;
1064         int digest_size;
1065         void *digest;
1066         int ok = 1;
1067
1068         if (unlikely(cancel))
1069                 goto out;
1070
1071         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1072         digest = kmalloc(digest_size, GFP_NOIO);
1073         if (!digest) {
1074                 ok = 0; /* terminate the connection in case the allocation failed */
1075                 goto out;
1076         }
1077
1078         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1079                 drbd_csum_ee(mdev, mdev->verify_tfm, peer_req, digest);
1080         else
1081                 memset(digest, 0, digest_size);
1082
1083         /* Free e and pages before send.
1084          * In case we block on congestion, we could otherwise run into
1085          * some distributed deadlock, if the other side blocks on
1086          * congestion as well, because our receiver blocks in
1087          * drbd_pp_alloc due to pp_in_use > max_buffers. */
1088         drbd_free_ee(mdev, peer_req);
1089         peer_req = NULL;
1090         inc_rs_pending(mdev);
1091         ok = drbd_send_drequest_csum(mdev, sector, size,
1092                                      digest, digest_size,
1093                                      P_OV_REPLY);
1094         if (!ok)
1095                 dec_rs_pending(mdev);
1096         kfree(digest);
1097
1098 out:
1099         if (peer_req)
1100                 drbd_free_ee(mdev, peer_req);
1101         dec_unacked(mdev);
1102         return ok;
1103 }
1104
1105 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1106 {
1107         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1108                 mdev->ov_last_oos_size += size>>9;
1109         } else {
1110                 mdev->ov_last_oos_start = sector;
1111                 mdev->ov_last_oos_size = size>>9;
1112         }
1113         drbd_set_out_of_sync(mdev, sector, size);
1114 }
1115
1116 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1117 {
1118         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1119         struct drbd_conf *mdev = w->mdev;
1120         struct digest_info *di;
1121         void *digest;
1122         sector_t sector = peer_req->i.sector;
1123         unsigned int size = peer_req->i.size;
1124         int digest_size;
1125         int ok, eq = 0;
1126
1127         if (unlikely(cancel)) {
1128                 drbd_free_ee(mdev, peer_req);
1129                 dec_unacked(mdev);
1130                 return 1;
1131         }
1132
1133         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1134          * the resync lru has been cleaned up already */
1135         if (get_ldev(mdev)) {
1136                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1137                 put_ldev(mdev);
1138         }
1139
1140         di = peer_req->digest;
1141
1142         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1143                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1144                 digest = kmalloc(digest_size, GFP_NOIO);
1145                 if (digest) {
1146                         drbd_csum_ee(mdev, mdev->verify_tfm, peer_req, digest);
1147
1148                         D_ASSERT(digest_size == di->digest_size);
1149                         eq = !memcmp(digest, di->digest, digest_size);
1150                         kfree(digest);
1151                 }
1152         }
1153
1154                 /* Free e and pages before send.
1155                  * In case we block on congestion, we could otherwise run into
1156                  * some distributed deadlock, if the other side blocks on
1157                  * congestion as well, because our receiver blocks in
1158                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
1159         drbd_free_ee(mdev, peer_req);
1160         if (!eq)
1161                 drbd_ov_oos_found(mdev, sector, size);
1162         else
1163                 ov_oos_print(mdev);
1164
1165         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1166                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1167
1168         dec_unacked(mdev);
1169
1170         --mdev->ov_left;
1171
1172         /* let's advance progress step marks only for every other megabyte */
1173         if ((mdev->ov_left & 0x200) == 0x200)
1174                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1175
1176         if (mdev->ov_left == 0) {
1177                 ov_oos_print(mdev);
1178                 drbd_resync_finished(mdev);
1179         }
1180
1181         return ok;
1182 }
1183
1184 int w_prev_work_done(struct drbd_work *w, int cancel)
1185 {
1186         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1187
1188         complete(&b->done);
1189         return 1;
1190 }
1191
1192 int w_send_barrier(struct drbd_work *w, int cancel)
1193 {
1194         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1195         struct drbd_conf *mdev = w->mdev;
1196         struct p_barrier *p = &mdev->tconn->data.sbuf.barrier;
1197         int ok = 1;
1198
1199         /* really avoid racing with tl_clear.  w.cb may have been referenced
1200          * just before it was reassigned and re-queued, so double check that.
1201          * actually, this race was harmless, since we only try to send the
1202          * barrier packet here, and otherwise do nothing with the object.
1203          * but compare with the head of w_clear_epoch */
1204         spin_lock_irq(&mdev->tconn->req_lock);
1205         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1206                 cancel = 1;
1207         spin_unlock_irq(&mdev->tconn->req_lock);
1208         if (cancel)
1209                 return 1;
1210
1211         if (!drbd_get_data_sock(mdev->tconn))
1212                 return 0;
1213         p->barrier = b->br_number;
1214         /* inc_ap_pending was done where this was queued.
1215          * dec_ap_pending will be done in got_BarrierAck
1216          * or (on connection loss) in w_clear_epoch.  */
1217         ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BARRIER,
1218                             &p->head, sizeof(*p), 0);
1219         drbd_put_data_sock(mdev->tconn);
1220
1221         return ok;
1222 }
1223
1224 int w_send_write_hint(struct drbd_work *w, int cancel)
1225 {
1226         struct drbd_conf *mdev = w->mdev;
1227         if (cancel)
1228                 return 1;
1229         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1230 }
1231
1232 int w_send_oos(struct drbd_work *w, int cancel)
1233 {
1234         struct drbd_request *req = container_of(w, struct drbd_request, w);
1235         struct drbd_conf *mdev = w->mdev;
1236         int ok;
1237
1238         if (unlikely(cancel)) {
1239                 req_mod(req, SEND_CANCELED);
1240                 return 1;
1241         }
1242
1243         ok = drbd_send_oos(mdev, req);
1244         req_mod(req, OOS_HANDED_TO_NETWORK);
1245
1246         return ok;
1247 }
1248
1249 /**
1250  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1251  * @mdev:       DRBD device.
1252  * @w:          work object.
1253  * @cancel:     The connection will be closed anyways
1254  */
1255 int w_send_dblock(struct drbd_work *w, int cancel)
1256 {
1257         struct drbd_request *req = container_of(w, struct drbd_request, w);
1258         struct drbd_conf *mdev = w->mdev;
1259         int ok;
1260
1261         if (unlikely(cancel)) {
1262                 req_mod(req, SEND_CANCELED);
1263                 return 1;
1264         }
1265
1266         ok = drbd_send_dblock(mdev, req);
1267         req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1268
1269         return ok;
1270 }
1271
1272 /**
1273  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1274  * @mdev:       DRBD device.
1275  * @w:          work object.
1276  * @cancel:     The connection will be closed anyways
1277  */
1278 int w_send_read_req(struct drbd_work *w, int cancel)
1279 {
1280         struct drbd_request *req = container_of(w, struct drbd_request, w);
1281         struct drbd_conf *mdev = w->mdev;
1282         int ok;
1283
1284         if (unlikely(cancel)) {
1285                 req_mod(req, SEND_CANCELED);
1286                 return 1;
1287         }
1288
1289         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1290                                 (unsigned long)req);
1291
1292         if (!ok) {
1293                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1294                  * so this is probably redundant */
1295                 if (mdev->state.conn >= C_CONNECTED)
1296                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1297         }
1298         req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1299
1300         return ok;
1301 }
1302
1303 int w_restart_disk_io(struct drbd_work *w, int cancel)
1304 {
1305         struct drbd_request *req = container_of(w, struct drbd_request, w);
1306         struct drbd_conf *mdev = w->mdev;
1307
1308         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1309                 drbd_al_begin_io(mdev, req->i.sector);
1310         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1311            theoretically. Practically it can not deadlock, since this is
1312            only used when unfreezing IOs. All the extents of the requests
1313            that made it into the TL are already active */
1314
1315         drbd_req_make_private_bio(req, req->master_bio);
1316         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1317         generic_make_request(req->private_bio);
1318
1319         return 1;
1320 }
1321
1322 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1323 {
1324         struct drbd_conf *odev = mdev;
1325
1326         while (1) {
1327                 if (odev->sync_conf.after == -1)
1328                         return 1;
1329                 odev = minor_to_mdev(odev->sync_conf.after);
1330                 if (!expect(odev))
1331                         return 1;
1332                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1333                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1334                     odev->state.aftr_isp || odev->state.peer_isp ||
1335                     odev->state.user_isp)
1336                         return 0;
1337         }
1338 }
1339
1340 /**
1341  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1342  * @mdev:       DRBD device.
1343  *
1344  * Called from process context only (admin command and after_state_ch).
1345  */
1346 static int _drbd_pause_after(struct drbd_conf *mdev)
1347 {
1348         struct drbd_conf *odev;
1349         int i, rv = 0;
1350
1351         for (i = 0; i < minor_count; i++) {
1352                 odev = minor_to_mdev(i);
1353                 if (!odev)
1354                         continue;
1355                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1356                         continue;
1357                 if (!_drbd_may_sync_now(odev))
1358                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1359                                != SS_NOTHING_TO_DO);
1360         }
1361
1362         return rv;
1363 }
1364
1365 /**
1366  * _drbd_resume_next() - Resume resync on all devices that may resync now
1367  * @mdev:       DRBD device.
1368  *
1369  * Called from process context only (admin command and worker).
1370  */
1371 static int _drbd_resume_next(struct drbd_conf *mdev)
1372 {
1373         struct drbd_conf *odev;
1374         int i, rv = 0;
1375
1376         for (i = 0; i < minor_count; i++) {
1377                 odev = minor_to_mdev(i);
1378                 if (!odev)
1379                         continue;
1380                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1381                         continue;
1382                 if (odev->state.aftr_isp) {
1383                         if (_drbd_may_sync_now(odev))
1384                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1385                                                         CS_HARD, NULL)
1386                                        != SS_NOTHING_TO_DO) ;
1387                 }
1388         }
1389         return rv;
1390 }
1391
1392 void resume_next_sg(struct drbd_conf *mdev)
1393 {
1394         write_lock_irq(&global_state_lock);
1395         _drbd_resume_next(mdev);
1396         write_unlock_irq(&global_state_lock);
1397 }
1398
1399 void suspend_other_sg(struct drbd_conf *mdev)
1400 {
1401         write_lock_irq(&global_state_lock);
1402         _drbd_pause_after(mdev);
1403         write_unlock_irq(&global_state_lock);
1404 }
1405
1406 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1407 {
1408         struct drbd_conf *odev;
1409
1410         if (o_minor == -1)
1411                 return NO_ERROR;
1412         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1413                 return ERR_SYNC_AFTER;
1414
1415         /* check for loops */
1416         odev = minor_to_mdev(o_minor);
1417         while (1) {
1418                 if (odev == mdev)
1419                         return ERR_SYNC_AFTER_CYCLE;
1420
1421                 /* dependency chain ends here, no cycles. */
1422                 if (odev->sync_conf.after == -1)
1423                         return NO_ERROR;
1424
1425                 /* follow the dependency chain */
1426                 odev = minor_to_mdev(odev->sync_conf.after);
1427         }
1428 }
1429
1430 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1431 {
1432         int changes;
1433         int retcode;
1434
1435         write_lock_irq(&global_state_lock);
1436         retcode = sync_after_error(mdev, na);
1437         if (retcode == NO_ERROR) {
1438                 mdev->sync_conf.after = na;
1439                 do {
1440                         changes  = _drbd_pause_after(mdev);
1441                         changes |= _drbd_resume_next(mdev);
1442                 } while (changes);
1443         }
1444         write_unlock_irq(&global_state_lock);
1445         return retcode;
1446 }
1447
1448 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1449 {
1450         atomic_set(&mdev->rs_sect_in, 0);
1451         atomic_set(&mdev->rs_sect_ev, 0);
1452         mdev->rs_in_flight = 0;
1453         mdev->rs_planed = 0;
1454         spin_lock(&mdev->peer_seq_lock);
1455         fifo_set(&mdev->rs_plan_s, 0);
1456         spin_unlock(&mdev->peer_seq_lock);
1457 }
1458
1459 void start_resync_timer_fn(unsigned long data)
1460 {
1461         struct drbd_conf *mdev = (struct drbd_conf *) data;
1462
1463         drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1464 }
1465
1466 int w_start_resync(struct drbd_work *w, int cancel)
1467 {
1468         struct drbd_conf *mdev = w->mdev;
1469
1470         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1471                 dev_warn(DEV, "w_start_resync later...\n");
1472                 mdev->start_resync_timer.expires = jiffies + HZ/10;
1473                 add_timer(&mdev->start_resync_timer);
1474                 return 1;
1475         }
1476
1477         drbd_start_resync(mdev, C_SYNC_SOURCE);
1478         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1479         return 1;
1480 }
1481
1482 /**
1483  * drbd_start_resync() - Start the resync process
1484  * @mdev:       DRBD device.
1485  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1486  *
1487  * This function might bring you directly into one of the
1488  * C_PAUSED_SYNC_* states.
1489  */
1490 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1491 {
1492         union drbd_state ns;
1493         int r;
1494
1495         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1496                 dev_err(DEV, "Resync already running!\n");
1497                 return;
1498         }
1499
1500         if (mdev->state.conn < C_AHEAD) {
1501                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1502                 drbd_rs_cancel_all(mdev);
1503                 /* This should be done when we abort the resync. We definitely do not
1504                    want to have this for connections going back and forth between
1505                    Ahead/Behind and SyncSource/SyncTarget */
1506         }
1507
1508         if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1509                 if (side == C_SYNC_TARGET) {
1510                         /* Since application IO was locked out during C_WF_BITMAP_T and
1511                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1512                            we check that we might make the data inconsistent. */
1513                         r = drbd_khelper(mdev, "before-resync-target");
1514                         r = (r >> 8) & 0xff;
1515                         if (r > 0) {
1516                                 dev_info(DEV, "before-resync-target handler returned %d, "
1517                                          "dropping connection.\n", r);
1518                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1519                                 return;
1520                         }
1521                 } else /* C_SYNC_SOURCE */ {
1522                         r = drbd_khelper(mdev, "before-resync-source");
1523                         r = (r >> 8) & 0xff;
1524                         if (r > 0) {
1525                                 if (r == 3) {
1526                                         dev_info(DEV, "before-resync-source handler returned %d, "
1527                                                  "ignoring. Old userland tools?", r);
1528                                 } else {
1529                                         dev_info(DEV, "before-resync-source handler returned %d, "
1530                                                  "dropping connection.\n", r);
1531                                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1532                                         return;
1533                                 }
1534                         }
1535                 }
1536         }
1537
1538         if (current == mdev->tconn->worker.task) {
1539                 /* The worker should not sleep waiting for drbd_state_lock(),
1540                    that can take long */
1541                 if (test_and_set_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
1542                         set_bit(B_RS_H_DONE, &mdev->flags);
1543                         mdev->start_resync_timer.expires = jiffies + HZ/5;
1544                         add_timer(&mdev->start_resync_timer);
1545                         return;
1546                 }
1547         } else {
1548                 drbd_state_lock(mdev);
1549         }
1550         clear_bit(B_RS_H_DONE, &mdev->flags);
1551
1552         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1553                 drbd_state_unlock(mdev);
1554                 return;
1555         }
1556
1557         write_lock_irq(&global_state_lock);
1558         ns = mdev->state;
1559
1560         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1561
1562         ns.conn = side;
1563
1564         if (side == C_SYNC_TARGET)
1565                 ns.disk = D_INCONSISTENT;
1566         else /* side == C_SYNC_SOURCE */
1567                 ns.pdsk = D_INCONSISTENT;
1568
1569         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1570         ns = mdev->state;
1571
1572         if (ns.conn < C_CONNECTED)
1573                 r = SS_UNKNOWN_ERROR;
1574
1575         if (r == SS_SUCCESS) {
1576                 unsigned long tw = drbd_bm_total_weight(mdev);
1577                 unsigned long now = jiffies;
1578                 int i;
1579
1580                 mdev->rs_failed    = 0;
1581                 mdev->rs_paused    = 0;
1582                 mdev->rs_same_csum = 0;
1583                 mdev->rs_last_events = 0;
1584                 mdev->rs_last_sect_ev = 0;
1585                 mdev->rs_total     = tw;
1586                 mdev->rs_start     = now;
1587                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1588                         mdev->rs_mark_left[i] = tw;
1589                         mdev->rs_mark_time[i] = now;
1590                 }
1591                 _drbd_pause_after(mdev);
1592         }
1593         write_unlock_irq(&global_state_lock);
1594
1595         if (r == SS_SUCCESS) {
1596                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1597                      drbd_conn_str(ns.conn),
1598                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1599                      (unsigned long) mdev->rs_total);
1600                 if (side == C_SYNC_TARGET)
1601                         mdev->bm_resync_fo = 0;
1602
1603                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1604                  * with w_send_oos, or the sync target will get confused as to
1605                  * how much bits to resync.  We cannot do that always, because for an
1606                  * empty resync and protocol < 95, we need to do it here, as we call
1607                  * drbd_resync_finished from here in that case.
1608                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1609                  * and from after_state_ch otherwise. */
1610                 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1611                         drbd_gen_and_send_sync_uuid(mdev);
1612
1613                 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1614                         /* This still has a race (about when exactly the peers
1615                          * detect connection loss) that can lead to a full sync
1616                          * on next handshake. In 8.3.9 we fixed this with explicit
1617                          * resync-finished notifications, but the fix
1618                          * introduces a protocol change.  Sleeping for some
1619                          * time longer than the ping interval + timeout on the
1620                          * SyncSource, to give the SyncTarget the chance to
1621                          * detect connection loss, then waiting for a ping
1622                          * response (implicit in drbd_resync_finished) reduces
1623                          * the race considerably, but does not solve it. */
1624                         if (side == C_SYNC_SOURCE)
1625                                 schedule_timeout_interruptible(
1626                                         mdev->tconn->net_conf->ping_int * HZ +
1627                                         mdev->tconn->net_conf->ping_timeo*HZ/9);
1628                         drbd_resync_finished(mdev);
1629                 }
1630
1631                 drbd_rs_controller_reset(mdev);
1632                 /* ns.conn may already be != mdev->state.conn,
1633                  * we may have been paused in between, or become paused until
1634                  * the timer triggers.
1635                  * No matter, that is handled in resync_timer_fn() */
1636                 if (ns.conn == C_SYNC_TARGET)
1637                         mod_timer(&mdev->resync_timer, jiffies);
1638
1639                 drbd_md_sync(mdev);
1640         }
1641         put_ldev(mdev);
1642         drbd_state_unlock(mdev);
1643 }
1644
1645 static int _worker_dying(int vnr, void *p, void *data)
1646 {
1647         struct drbd_conf *mdev = (struct drbd_conf *)p;
1648
1649         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1650         /* _drbd_set_state only uses stop_nowait.
1651          * wait here for the exiting receiver. */
1652         drbd_thread_stop(&mdev->tconn->receiver);
1653         drbd_mdev_cleanup(mdev);
1654
1655         clear_bit(DEVICE_DYING, &mdev->flags);
1656         clear_bit(CONFIG_PENDING, &mdev->flags);
1657         wake_up(&mdev->state_wait);
1658
1659         return 0;
1660 }
1661
1662 int drbd_worker(struct drbd_thread *thi)
1663 {
1664         struct drbd_tconn *tconn = thi->tconn;
1665         struct drbd_work *w = NULL;
1666         LIST_HEAD(work_list);
1667         int intr = 0;
1668
1669         while (get_t_state(thi) == RUNNING) {
1670                 drbd_thread_current_set_cpu(thi);
1671
1672                 if (down_trylock(&tconn->data.work.s)) {
1673                         mutex_lock(&tconn->data.mutex);
1674                         if (tconn->data.socket && !tconn->net_conf->no_cork)
1675                                 drbd_tcp_uncork(tconn->data.socket);
1676                         mutex_unlock(&tconn->data.mutex);
1677
1678                         intr = down_interruptible(&tconn->data.work.s);
1679
1680                         mutex_lock(&tconn->data.mutex);
1681                         if (tconn->data.socket  && !tconn->net_conf->no_cork)
1682                                 drbd_tcp_cork(tconn->data.socket);
1683                         mutex_unlock(&tconn->data.mutex);
1684                 }
1685
1686                 if (intr) {
1687                         flush_signals(current);
1688                         if (get_t_state(thi) == RUNNING) {
1689                                 conn_warn(tconn, "Worker got an unexpected signal\n");
1690                                 continue;
1691                         }
1692                         break;
1693                 }
1694
1695                 if (get_t_state(thi) != RUNNING)
1696                         break;
1697                 /* With this break, we have done a down() but not consumed
1698                    the entry from the list. The cleanup code takes care of
1699                    this...   */
1700
1701                 w = NULL;
1702                 spin_lock_irq(&tconn->data.work.q_lock);
1703                 if (list_empty(&tconn->data.work.q)) {
1704                         /* something terribly wrong in our logic.
1705                          * we were able to down() the semaphore,
1706                          * but the list is empty... doh.
1707                          *
1708                          * what is the best thing to do now?
1709                          * try again from scratch, restarting the receiver,
1710                          * asender, whatnot? could break even more ugly,
1711                          * e.g. when we are primary, but no good local data.
1712                          *
1713                          * I'll try to get away just starting over this loop.
1714                          */
1715                         conn_warn(tconn, "Work list unexpectedly empty\n");
1716                         spin_unlock_irq(&tconn->data.work.q_lock);
1717                         continue;
1718                 }
1719                 w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1720                 list_del_init(&w->list);
1721                 spin_unlock_irq(&tconn->data.work.q_lock);
1722
1723                 if (!w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1724                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1725                         if (tconn->cstate >= C_WF_REPORT_PARAMS)
1726                                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1727                 }
1728         }
1729
1730         spin_lock_irq(&tconn->data.work.q_lock);
1731         while (!list_empty(&tconn->data.work.q)) {
1732                 list_splice_init(&tconn->data.work.q, &work_list);
1733                 spin_unlock_irq(&tconn->data.work.q_lock);
1734
1735                 while (!list_empty(&work_list)) {
1736                         w = list_entry(work_list.next, struct drbd_work, list);
1737                         list_del_init(&w->list);
1738                         w->cb(w, 1);
1739                 }
1740
1741                 spin_lock_irq(&tconn->data.work.q_lock);
1742         }
1743         sema_init(&tconn->data.work.s, 0);
1744         /* DANGEROUS race: if someone did queue his work within the spinlock,
1745          * but up() ed outside the spinlock, we could get an up() on the
1746          * semaphore without corresponding list entry.
1747          * So don't do that.
1748          */
1749         spin_unlock_irq(&tconn->data.work.q_lock);
1750
1751         idr_for_each(&tconn->volumes, _worker_dying, NULL);
1752
1753         return 0;
1754 }