]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_worker.c
drbd: _drbd_send_cmd(): Return 0 upon success and an error code otherwise
[karo-tx-linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_work *w, int cancel);
43
44
45
46 /* endio handlers:
47  *   drbd_md_io_complete (defined here)
48  *   drbd_request_endio (defined here)
49  *   drbd_peer_request_endio (defined here)
50  *   bm_async_io_complete (defined in drbd_bitmap.c)
51  *
52  * For all these callbacks, note the following:
53  * The callbacks will be called in irq context by the IDE drivers,
54  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
55  * Try to get the locking right :)
56  *
57  */
58
59
60 /* About the global_state_lock
61    Each state transition on an device holds a read lock. In case we have
62    to evaluate the sync after dependencies, we grab a write lock, because
63    we need stable states on all devices for that.  */
64 rwlock_t global_state_lock;
65
66 /* used for synchronous meta data and bitmap IO
67  * submitted by drbd_md_sync_page_io()
68  */
69 void drbd_md_io_complete(struct bio *bio, int error)
70 {
71         struct drbd_md_io *md_io;
72
73         md_io = (struct drbd_md_io *)bio->bi_private;
74         md_io->error = error;
75
76         complete(&md_io->event);
77 }
78
79 /* reads on behalf of the partner,
80  * "submitted" by the receiver
81  */
82 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
83 {
84         unsigned long flags = 0;
85         struct drbd_conf *mdev = peer_req->w.mdev;
86
87         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
88         mdev->read_cnt += peer_req->i.size >> 9;
89         list_del(&peer_req->w.list);
90         if (list_empty(&mdev->read_ee))
91                 wake_up(&mdev->ee_wait);
92         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
93                 __drbd_chk_io_error(mdev, false);
94         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
95
96         drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
97         put_ldev(mdev);
98 }
99
100 /* writes on behalf of the partner, or resync writes,
101  * "submitted" by the receiver, final stage.  */
102 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
103 {
104         unsigned long flags = 0;
105         struct drbd_conf *mdev = peer_req->w.mdev;
106         sector_t e_sector;
107         int do_wake;
108         u64 block_id;
109         int do_al_complete_io;
110
111         /* after we moved peer_req to done_ee,
112          * we may no longer access it,
113          * it may be freed/reused already!
114          * (as soon as we release the req_lock) */
115         e_sector = peer_req->i.sector;
116         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
117         block_id = peer_req->block_id;
118
119         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
120         mdev->writ_cnt += peer_req->i.size >> 9;
121         list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
122         list_add_tail(&peer_req->w.list, &mdev->done_ee);
123
124         /*
125          * Do not remove from the write_requests tree here: we did not send the
126          * Ack yet and did not wake possibly waiting conflicting requests.
127          * Removed from the tree from "drbd_process_done_ee" within the
128          * appropriate w.cb (e_end_block/e_end_resync_block) or from
129          * _drbd_clear_done_ee.
130          */
131
132         do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
133
134         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
135                 __drbd_chk_io_error(mdev, false);
136         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
137
138         if (block_id == ID_SYNCER)
139                 drbd_rs_complete_io(mdev, e_sector);
140
141         if (do_wake)
142                 wake_up(&mdev->ee_wait);
143
144         if (do_al_complete_io)
145                 drbd_al_complete_io(mdev, e_sector);
146
147         wake_asender(mdev->tconn);
148         put_ldev(mdev);
149 }
150
151 /* writes on behalf of the partner, or resync writes,
152  * "submitted" by the receiver.
153  */
154 void drbd_peer_request_endio(struct bio *bio, int error)
155 {
156         struct drbd_peer_request *peer_req = bio->bi_private;
157         struct drbd_conf *mdev = peer_req->w.mdev;
158         int uptodate = bio_flagged(bio, BIO_UPTODATE);
159         int is_write = bio_data_dir(bio) == WRITE;
160
161         if (error && __ratelimit(&drbd_ratelimit_state))
162                 dev_warn(DEV, "%s: error=%d s=%llus\n",
163                                 is_write ? "write" : "read", error,
164                                 (unsigned long long)peer_req->i.sector);
165         if (!error && !uptodate) {
166                 if (__ratelimit(&drbd_ratelimit_state))
167                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
168                                         is_write ? "write" : "read",
169                                         (unsigned long long)peer_req->i.sector);
170                 /* strange behavior of some lower level drivers...
171                  * fail the request by clearing the uptodate flag,
172                  * but do not return any error?! */
173                 error = -EIO;
174         }
175
176         if (error)
177                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
178
179         bio_put(bio); /* no need for the bio anymore */
180         if (atomic_dec_and_test(&peer_req->pending_bios)) {
181                 if (is_write)
182                         drbd_endio_write_sec_final(peer_req);
183                 else
184                         drbd_endio_read_sec_final(peer_req);
185         }
186 }
187
188 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
189  */
190 void drbd_request_endio(struct bio *bio, int error)
191 {
192         unsigned long flags;
193         struct drbd_request *req = bio->bi_private;
194         struct drbd_conf *mdev = req->w.mdev;
195         struct bio_and_error m;
196         enum drbd_req_event what;
197         int uptodate = bio_flagged(bio, BIO_UPTODATE);
198
199         if (!error && !uptodate) {
200                 dev_warn(DEV, "p %s: setting error to -EIO\n",
201                          bio_data_dir(bio) == WRITE ? "write" : "read");
202                 /* strange behavior of some lower level drivers...
203                  * fail the request by clearing the uptodate flag,
204                  * but do not return any error?! */
205                 error = -EIO;
206         }
207
208         /* to avoid recursion in __req_mod */
209         if (unlikely(error)) {
210                 what = (bio_data_dir(bio) == WRITE)
211                         ? WRITE_COMPLETED_WITH_ERROR
212                         : (bio_rw(bio) == READ)
213                           ? READ_COMPLETED_WITH_ERROR
214                           : READ_AHEAD_COMPLETED_WITH_ERROR;
215         } else
216                 what = COMPLETED_OK;
217
218         bio_put(req->private_bio);
219         req->private_bio = ERR_PTR(error);
220
221         /* not req_mod(), we need irqsave here! */
222         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
223         __req_mod(req, what, &m);
224         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
225
226         if (m.bio)
227                 complete_master_bio(mdev, &m);
228 }
229
230 int w_read_retry_remote(struct drbd_work *w, int cancel)
231 {
232         struct drbd_request *req = container_of(w, struct drbd_request, w);
233         struct drbd_conf *mdev = w->mdev;
234
235         /* We should not detach for read io-error,
236          * but try to WRITE the P_DATA_REPLY to the failed location,
237          * to give the disk the chance to relocate that block */
238
239         spin_lock_irq(&mdev->tconn->req_lock);
240         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
241                 _req_mod(req, READ_RETRY_REMOTE_CANCELED);
242                 spin_unlock_irq(&mdev->tconn->req_lock);
243                 return 1;
244         }
245         spin_unlock_irq(&mdev->tconn->req_lock);
246
247         return w_send_read_req(w, 0);
248 }
249
250 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
251                   struct drbd_peer_request *peer_req, void *digest)
252 {
253         struct hash_desc desc;
254         struct scatterlist sg;
255         struct page *page = peer_req->pages;
256         struct page *tmp;
257         unsigned len;
258
259         desc.tfm = tfm;
260         desc.flags = 0;
261
262         sg_init_table(&sg, 1);
263         crypto_hash_init(&desc);
264
265         while ((tmp = page_chain_next(page))) {
266                 /* all but the last page will be fully used */
267                 sg_set_page(&sg, page, PAGE_SIZE, 0);
268                 crypto_hash_update(&desc, &sg, sg.length);
269                 page = tmp;
270         }
271         /* and now the last, possibly only partially used page */
272         len = peer_req->i.size & (PAGE_SIZE - 1);
273         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
274         crypto_hash_update(&desc, &sg, sg.length);
275         crypto_hash_final(&desc, digest);
276 }
277
278 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
279 {
280         struct hash_desc desc;
281         struct scatterlist sg;
282         struct bio_vec *bvec;
283         int i;
284
285         desc.tfm = tfm;
286         desc.flags = 0;
287
288         sg_init_table(&sg, 1);
289         crypto_hash_init(&desc);
290
291         __bio_for_each_segment(bvec, bio, i, 0) {
292                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
293                 crypto_hash_update(&desc, &sg, sg.length);
294         }
295         crypto_hash_final(&desc, digest);
296 }
297
298 /* MAYBE merge common code with w_e_end_ov_req */
299 static int w_e_send_csum(struct drbd_work *w, int cancel)
300 {
301         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
302         struct drbd_conf *mdev = w->mdev;
303         int digest_size;
304         void *digest;
305         int ok = 1;
306
307         if (unlikely(cancel))
308                 goto out;
309
310         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
311                 goto out;
312
313         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
314         digest = kmalloc(digest_size, GFP_NOIO);
315         if (digest) {
316                 sector_t sector = peer_req->i.sector;
317                 unsigned int size = peer_req->i.size;
318                 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
319                 /* Free peer_req and pages before send.
320                  * In case we block on congestion, we could otherwise run into
321                  * some distributed deadlock, if the other side blocks on
322                  * congestion as well, because our receiver blocks in
323                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
324                 drbd_free_ee(mdev, peer_req);
325                 peer_req = NULL;
326                 inc_rs_pending(mdev);
327                 ok = drbd_send_drequest_csum(mdev, sector, size,
328                                              digest, digest_size,
329                                              P_CSUM_RS_REQUEST);
330                 kfree(digest);
331         } else {
332                 dev_err(DEV, "kmalloc() of digest failed.\n");
333                 ok = 0;
334         }
335
336 out:
337         if (peer_req)
338                 drbd_free_ee(mdev, peer_req);
339
340         if (unlikely(!ok))
341                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
342         return ok;
343 }
344
345 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
346
347 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
348 {
349         struct drbd_peer_request *peer_req;
350
351         if (!get_ldev(mdev))
352                 return -EIO;
353
354         if (drbd_rs_should_slow_down(mdev, sector))
355                 goto defer;
356
357         /* GFP_TRY, because if there is no memory available right now, this may
358          * be rescheduled for later. It is "only" background resync, after all. */
359         peer_req = drbd_alloc_ee(mdev, ID_SYNCER /* unused */, sector, size, GFP_TRY);
360         if (!peer_req)
361                 goto defer;
362
363         peer_req->w.cb = w_e_send_csum;
364         spin_lock_irq(&mdev->tconn->req_lock);
365         list_add(&peer_req->w.list, &mdev->read_ee);
366         spin_unlock_irq(&mdev->tconn->req_lock);
367
368         atomic_add(size >> 9, &mdev->rs_sect_ev);
369         if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
370                 return 0;
371
372         /* If it failed because of ENOMEM, retry should help.  If it failed
373          * because bio_add_page failed (probably broken lower level driver),
374          * retry may or may not help.
375          * If it does not, you may need to force disconnect. */
376         spin_lock_irq(&mdev->tconn->req_lock);
377         list_del(&peer_req->w.list);
378         spin_unlock_irq(&mdev->tconn->req_lock);
379
380         drbd_free_ee(mdev, peer_req);
381 defer:
382         put_ldev(mdev);
383         return -EAGAIN;
384 }
385
386 int w_resync_timer(struct drbd_work *w, int cancel)
387 {
388         struct drbd_conf *mdev = w->mdev;
389         switch (mdev->state.conn) {
390         case C_VERIFY_S:
391                 w_make_ov_request(w, cancel);
392                 break;
393         case C_SYNC_TARGET:
394                 w_make_resync_request(w, cancel);
395                 break;
396         }
397
398         return 1;
399 }
400
401 void resync_timer_fn(unsigned long data)
402 {
403         struct drbd_conf *mdev = (struct drbd_conf *) data;
404
405         if (list_empty(&mdev->resync_work.list))
406                 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
407 }
408
409 static void fifo_set(struct fifo_buffer *fb, int value)
410 {
411         int i;
412
413         for (i = 0; i < fb->size; i++)
414                 fb->values[i] = value;
415 }
416
417 static int fifo_push(struct fifo_buffer *fb, int value)
418 {
419         int ov;
420
421         ov = fb->values[fb->head_index];
422         fb->values[fb->head_index++] = value;
423
424         if (fb->head_index >= fb->size)
425                 fb->head_index = 0;
426
427         return ov;
428 }
429
430 static void fifo_add_val(struct fifo_buffer *fb, int value)
431 {
432         int i;
433
434         for (i = 0; i < fb->size; i++)
435                 fb->values[i] += value;
436 }
437
438 static int drbd_rs_controller(struct drbd_conf *mdev)
439 {
440         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
441         unsigned int want;     /* The number of sectors we want in the proxy */
442         int req_sect; /* Number of sectors to request in this turn */
443         int correction; /* Number of sectors more we need in the proxy*/
444         int cps; /* correction per invocation of drbd_rs_controller() */
445         int steps; /* Number of time steps to plan ahead */
446         int curr_corr;
447         int max_sect;
448
449         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
450         mdev->rs_in_flight -= sect_in;
451
452         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
453
454         steps = mdev->rs_plan_s.size; /* (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
455
456         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
457                 want = ((mdev->ldev->dc.resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
458         } else { /* normal path */
459                 want = mdev->ldev->dc.c_fill_target ? mdev->ldev->dc.c_fill_target :
460                         sect_in * mdev->ldev->dc.c_delay_target * HZ / (SLEEP_TIME * 10);
461         }
462
463         correction = want - mdev->rs_in_flight - mdev->rs_planed;
464
465         /* Plan ahead */
466         cps = correction / steps;
467         fifo_add_val(&mdev->rs_plan_s, cps);
468         mdev->rs_planed += cps * steps;
469
470         /* What we do in this step */
471         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
472         spin_unlock(&mdev->peer_seq_lock);
473         mdev->rs_planed -= curr_corr;
474
475         req_sect = sect_in + curr_corr;
476         if (req_sect < 0)
477                 req_sect = 0;
478
479         max_sect = (mdev->ldev->dc.c_max_rate * 2 * SLEEP_TIME) / HZ;
480         if (req_sect > max_sect)
481                 req_sect = max_sect;
482
483         /*
484         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
485                  sect_in, mdev->rs_in_flight, want, correction,
486                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
487         */
488
489         return req_sect;
490 }
491
492 static int drbd_rs_number_requests(struct drbd_conf *mdev)
493 {
494         int number;
495         if (mdev->rs_plan_s.size) { /* mdev->ldev->dc.c_plan_ahead */
496                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
497                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
498         } else {
499                 mdev->c_sync_rate = mdev->ldev->dc.resync_rate;
500                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
501         }
502
503         /* ignore the amount of pending requests, the resync controller should
504          * throttle down to incoming reply rate soon enough anyways. */
505         return number;
506 }
507
508 static int w_make_resync_request(struct drbd_work *w, int cancel)
509 {
510         struct drbd_conf *mdev = w->mdev;
511         unsigned long bit;
512         sector_t sector;
513         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
514         int max_bio_size;
515         int number, rollback_i, size;
516         int align, queued, sndbuf;
517         int i = 0;
518
519         if (unlikely(cancel))
520                 return 1;
521
522         if (mdev->rs_total == 0) {
523                 /* empty resync? */
524                 drbd_resync_finished(mdev);
525                 return 1;
526         }
527
528         if (!get_ldev(mdev)) {
529                 /* Since we only need to access mdev->rsync a
530                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
531                    to continue resync with a broken disk makes no sense at
532                    all */
533                 dev_err(DEV, "Disk broke down during resync!\n");
534                 return 1;
535         }
536
537         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
538         number = drbd_rs_number_requests(mdev);
539         if (number == 0)
540                 goto requeue;
541
542         for (i = 0; i < number; i++) {
543                 /* Stop generating RS requests, when half of the send buffer is filled */
544                 mutex_lock(&mdev->tconn->data.mutex);
545                 if (mdev->tconn->data.socket) {
546                         queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
547                         sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
548                 } else {
549                         queued = 1;
550                         sndbuf = 0;
551                 }
552                 mutex_unlock(&mdev->tconn->data.mutex);
553                 if (queued > sndbuf / 2)
554                         goto requeue;
555
556 next_sector:
557                 size = BM_BLOCK_SIZE;
558                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
559
560                 if (bit == DRBD_END_OF_BITMAP) {
561                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
562                         put_ldev(mdev);
563                         return 1;
564                 }
565
566                 sector = BM_BIT_TO_SECT(bit);
567
568                 if (drbd_rs_should_slow_down(mdev, sector) ||
569                     drbd_try_rs_begin_io(mdev, sector)) {
570                         mdev->bm_resync_fo = bit;
571                         goto requeue;
572                 }
573                 mdev->bm_resync_fo = bit + 1;
574
575                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
576                         drbd_rs_complete_io(mdev, sector);
577                         goto next_sector;
578                 }
579
580 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
581                 /* try to find some adjacent bits.
582                  * we stop if we have already the maximum req size.
583                  *
584                  * Additionally always align bigger requests, in order to
585                  * be prepared for all stripe sizes of software RAIDs.
586                  */
587                 align = 1;
588                 rollback_i = i;
589                 for (;;) {
590                         if (size + BM_BLOCK_SIZE > max_bio_size)
591                                 break;
592
593                         /* Be always aligned */
594                         if (sector & ((1<<(align+3))-1))
595                                 break;
596
597                         /* do not cross extent boundaries */
598                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
599                                 break;
600                         /* now, is it actually dirty, after all?
601                          * caution, drbd_bm_test_bit is tri-state for some
602                          * obscure reason; ( b == 0 ) would get the out-of-band
603                          * only accidentally right because of the "oddly sized"
604                          * adjustment below */
605                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
606                                 break;
607                         bit++;
608                         size += BM_BLOCK_SIZE;
609                         if ((BM_BLOCK_SIZE << align) <= size)
610                                 align++;
611                         i++;
612                 }
613                 /* if we merged some,
614                  * reset the offset to start the next drbd_bm_find_next from */
615                 if (size > BM_BLOCK_SIZE)
616                         mdev->bm_resync_fo = bit + 1;
617 #endif
618
619                 /* adjust very last sectors, in case we are oddly sized */
620                 if (sector + (size>>9) > capacity)
621                         size = (capacity-sector)<<9;
622                 if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
623                         switch (read_for_csum(mdev, sector, size)) {
624                         case -EIO: /* Disk failure */
625                                 put_ldev(mdev);
626                                 return 0;
627                         case -EAGAIN: /* allocation failed, or ldev busy */
628                                 drbd_rs_complete_io(mdev, sector);
629                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
630                                 i = rollback_i;
631                                 goto requeue;
632                         case 0:
633                                 /* everything ok */
634                                 break;
635                         default:
636                                 BUG();
637                         }
638                 } else {
639                         inc_rs_pending(mdev);
640                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
641                                                sector, size, ID_SYNCER)) {
642                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
643                                 dec_rs_pending(mdev);
644                                 put_ldev(mdev);
645                                 return 0;
646                         }
647                 }
648         }
649
650         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
651                 /* last syncer _request_ was sent,
652                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
653                  * next sync group will resume), as soon as we receive the last
654                  * resync data block, and the last bit is cleared.
655                  * until then resync "work" is "inactive" ...
656                  */
657                 put_ldev(mdev);
658                 return 1;
659         }
660
661  requeue:
662         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
663         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
664         put_ldev(mdev);
665         return 1;
666 }
667
668 static int w_make_ov_request(struct drbd_work *w, int cancel)
669 {
670         struct drbd_conf *mdev = w->mdev;
671         int number, i, size;
672         sector_t sector;
673         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
674
675         if (unlikely(cancel))
676                 return 1;
677
678         number = drbd_rs_number_requests(mdev);
679
680         sector = mdev->ov_position;
681         for (i = 0; i < number; i++) {
682                 if (sector >= capacity) {
683                         return 1;
684                 }
685
686                 size = BM_BLOCK_SIZE;
687
688                 if (drbd_rs_should_slow_down(mdev, sector) ||
689                     drbd_try_rs_begin_io(mdev, sector)) {
690                         mdev->ov_position = sector;
691                         goto requeue;
692                 }
693
694                 if (sector + (size>>9) > capacity)
695                         size = (capacity-sector)<<9;
696
697                 inc_rs_pending(mdev);
698                 if (!drbd_send_ov_request(mdev, sector, size)) {
699                         dec_rs_pending(mdev);
700                         return 0;
701                 }
702                 sector += BM_SECT_PER_BIT;
703         }
704         mdev->ov_position = sector;
705
706  requeue:
707         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
708         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
709         return 1;
710 }
711
712 int w_ov_finished(struct drbd_work *w, int cancel)
713 {
714         struct drbd_conf *mdev = w->mdev;
715         kfree(w);
716         ov_oos_print(mdev);
717         drbd_resync_finished(mdev);
718
719         return 1;
720 }
721
722 static int w_resync_finished(struct drbd_work *w, int cancel)
723 {
724         struct drbd_conf *mdev = w->mdev;
725         kfree(w);
726
727         drbd_resync_finished(mdev);
728
729         return 1;
730 }
731
732 static void ping_peer(struct drbd_conf *mdev)
733 {
734         struct drbd_tconn *tconn = mdev->tconn;
735
736         clear_bit(GOT_PING_ACK, &tconn->flags);
737         request_ping(tconn);
738         wait_event(tconn->ping_wait,
739                    test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
740 }
741
742 int drbd_resync_finished(struct drbd_conf *mdev)
743 {
744         unsigned long db, dt, dbdt;
745         unsigned long n_oos;
746         union drbd_state os, ns;
747         struct drbd_work *w;
748         char *khelper_cmd = NULL;
749         int verify_done = 0;
750
751         /* Remove all elements from the resync LRU. Since future actions
752          * might set bits in the (main) bitmap, then the entries in the
753          * resync LRU would be wrong. */
754         if (drbd_rs_del_all(mdev)) {
755                 /* In case this is not possible now, most probably because
756                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
757                  * queue (or even the read operations for those packets
758                  * is not finished by now).   Retry in 100ms. */
759
760                 schedule_timeout_interruptible(HZ / 10);
761                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
762                 if (w) {
763                         w->cb = w_resync_finished;
764                         drbd_queue_work(&mdev->tconn->data.work, w);
765                         return 1;
766                 }
767                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
768         }
769
770         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
771         if (dt <= 0)
772                 dt = 1;
773         db = mdev->rs_total;
774         dbdt = Bit2KB(db/dt);
775         mdev->rs_paused /= HZ;
776
777         if (!get_ldev(mdev))
778                 goto out;
779
780         ping_peer(mdev);
781
782         spin_lock_irq(&mdev->tconn->req_lock);
783         os = mdev->state;
784
785         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
786
787         /* This protects us against multiple calls (that can happen in the presence
788            of application IO), and against connectivity loss just before we arrive here. */
789         if (os.conn <= C_CONNECTED)
790                 goto out_unlock;
791
792         ns = os;
793         ns.conn = C_CONNECTED;
794
795         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
796              verify_done ? "Online verify " : "Resync",
797              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
798
799         n_oos = drbd_bm_total_weight(mdev);
800
801         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
802                 if (n_oos) {
803                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
804                               n_oos, Bit2KB(1));
805                         khelper_cmd = "out-of-sync";
806                 }
807         } else {
808                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
809
810                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
811                         khelper_cmd = "after-resync-target";
812
813                 if (mdev->tconn->csums_tfm && mdev->rs_total) {
814                         const unsigned long s = mdev->rs_same_csum;
815                         const unsigned long t = mdev->rs_total;
816                         const int ratio =
817                                 (t == 0)     ? 0 :
818                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
819                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
820                              "transferred %luK total %luK\n",
821                              ratio,
822                              Bit2KB(mdev->rs_same_csum),
823                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
824                              Bit2KB(mdev->rs_total));
825                 }
826         }
827
828         if (mdev->rs_failed) {
829                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
830
831                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
832                         ns.disk = D_INCONSISTENT;
833                         ns.pdsk = D_UP_TO_DATE;
834                 } else {
835                         ns.disk = D_UP_TO_DATE;
836                         ns.pdsk = D_INCONSISTENT;
837                 }
838         } else {
839                 ns.disk = D_UP_TO_DATE;
840                 ns.pdsk = D_UP_TO_DATE;
841
842                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
843                         if (mdev->p_uuid) {
844                                 int i;
845                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
846                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
847                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
848                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
849                         } else {
850                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
851                         }
852                 }
853
854                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
855                         /* for verify runs, we don't update uuids here,
856                          * so there would be nothing to report. */
857                         drbd_uuid_set_bm(mdev, 0UL);
858                         drbd_print_uuids(mdev, "updated UUIDs");
859                         if (mdev->p_uuid) {
860                                 /* Now the two UUID sets are equal, update what we
861                                  * know of the peer. */
862                                 int i;
863                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
864                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
865                         }
866                 }
867         }
868
869         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
870 out_unlock:
871         spin_unlock_irq(&mdev->tconn->req_lock);
872         put_ldev(mdev);
873 out:
874         mdev->rs_total  = 0;
875         mdev->rs_failed = 0;
876         mdev->rs_paused = 0;
877         if (verify_done)
878                 mdev->ov_start_sector = 0;
879
880         drbd_md_sync(mdev);
881
882         if (khelper_cmd)
883                 drbd_khelper(mdev, khelper_cmd);
884
885         return 1;
886 }
887
888 /* helper */
889 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
890 {
891         if (drbd_ee_has_active_page(peer_req)) {
892                 /* This might happen if sendpage() has not finished */
893                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
894                 atomic_add(i, &mdev->pp_in_use_by_net);
895                 atomic_sub(i, &mdev->pp_in_use);
896                 spin_lock_irq(&mdev->tconn->req_lock);
897                 list_add_tail(&peer_req->w.list, &mdev->net_ee);
898                 spin_unlock_irq(&mdev->tconn->req_lock);
899                 wake_up(&drbd_pp_wait);
900         } else
901                 drbd_free_ee(mdev, peer_req);
902 }
903
904 /**
905  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
906  * @mdev:       DRBD device.
907  * @w:          work object.
908  * @cancel:     The connection will be closed anyways
909  */
910 int w_e_end_data_req(struct drbd_work *w, int cancel)
911 {
912         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
913         struct drbd_conf *mdev = w->mdev;
914         int ok;
915
916         if (unlikely(cancel)) {
917                 drbd_free_ee(mdev, peer_req);
918                 dec_unacked(mdev);
919                 return 1;
920         }
921
922         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
923                 ok = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
924         } else {
925                 if (__ratelimit(&drbd_ratelimit_state))
926                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
927                             (unsigned long long)peer_req->i.sector);
928
929                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
930         }
931
932         dec_unacked(mdev);
933
934         move_to_net_ee_or_free(mdev, peer_req);
935
936         if (unlikely(!ok))
937                 dev_err(DEV, "drbd_send_block() failed\n");
938         return ok;
939 }
940
941 /**
942  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
943  * @mdev:       DRBD device.
944  * @w:          work object.
945  * @cancel:     The connection will be closed anyways
946  */
947 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
948 {
949         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
950         struct drbd_conf *mdev = w->mdev;
951         int ok;
952
953         if (unlikely(cancel)) {
954                 drbd_free_ee(mdev, peer_req);
955                 dec_unacked(mdev);
956                 return 1;
957         }
958
959         if (get_ldev_if_state(mdev, D_FAILED)) {
960                 drbd_rs_complete_io(mdev, peer_req->i.sector);
961                 put_ldev(mdev);
962         }
963
964         if (mdev->state.conn == C_AHEAD) {
965                 ok = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
966         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
967                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
968                         inc_rs_pending(mdev);
969                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
970                 } else {
971                         if (__ratelimit(&drbd_ratelimit_state))
972                                 dev_err(DEV, "Not sending RSDataReply, "
973                                     "partner DISKLESS!\n");
974                         ok = 1;
975                 }
976         } else {
977                 if (__ratelimit(&drbd_ratelimit_state))
978                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
979                             (unsigned long long)peer_req->i.sector);
980
981                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
982
983                 /* update resync data with failure */
984                 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
985         }
986
987         dec_unacked(mdev);
988
989         move_to_net_ee_or_free(mdev, peer_req);
990
991         if (unlikely(!ok))
992                 dev_err(DEV, "drbd_send_block() failed\n");
993         return ok;
994 }
995
996 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
997 {
998         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
999         struct drbd_conf *mdev = w->mdev;
1000         struct digest_info *di;
1001         int digest_size;
1002         void *digest = NULL;
1003         int ok, eq = 0;
1004
1005         if (unlikely(cancel)) {
1006                 drbd_free_ee(mdev, peer_req);
1007                 dec_unacked(mdev);
1008                 return 1;
1009         }
1010
1011         if (get_ldev(mdev)) {
1012                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1013                 put_ldev(mdev);
1014         }
1015
1016         di = peer_req->digest;
1017
1018         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1019                 /* quick hack to try to avoid a race against reconfiguration.
1020                  * a real fix would be much more involved,
1021                  * introducing more locking mechanisms */
1022                 if (mdev->tconn->csums_tfm) {
1023                         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1024                         D_ASSERT(digest_size == di->digest_size);
1025                         digest = kmalloc(digest_size, GFP_NOIO);
1026                 }
1027                 if (digest) {
1028                         drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1029                         eq = !memcmp(digest, di->digest, digest_size);
1030                         kfree(digest);
1031                 }
1032
1033                 if (eq) {
1034                         drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1035                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1036                         mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1037                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1038                 } else {
1039                         inc_rs_pending(mdev);
1040                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1041                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1042                         kfree(di);
1043                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1044                 }
1045         } else {
1046                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1047                 if (__ratelimit(&drbd_ratelimit_state))
1048                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1049         }
1050
1051         dec_unacked(mdev);
1052         move_to_net_ee_or_free(mdev, peer_req);
1053
1054         if (unlikely(!ok))
1055                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1056         return ok;
1057 }
1058
1059 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1060 {
1061         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1062         struct drbd_conf *mdev = w->mdev;
1063         sector_t sector = peer_req->i.sector;
1064         unsigned int size = peer_req->i.size;
1065         int digest_size;
1066         void *digest;
1067         int ok = 1;
1068
1069         if (unlikely(cancel))
1070                 goto out;
1071
1072         digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1073         digest = kmalloc(digest_size, GFP_NOIO);
1074         if (!digest) {
1075                 ok = 0; /* terminate the connection in case the allocation failed */
1076                 goto out;
1077         }
1078
1079         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1080                 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1081         else
1082                 memset(digest, 0, digest_size);
1083
1084         /* Free e and pages before send.
1085          * In case we block on congestion, we could otherwise run into
1086          * some distributed deadlock, if the other side blocks on
1087          * congestion as well, because our receiver blocks in
1088          * drbd_pp_alloc due to pp_in_use > max_buffers. */
1089         drbd_free_ee(mdev, peer_req);
1090         peer_req = NULL;
1091         inc_rs_pending(mdev);
1092         ok = drbd_send_drequest_csum(mdev, sector, size,
1093                                      digest, digest_size,
1094                                      P_OV_REPLY);
1095         if (!ok)
1096                 dec_rs_pending(mdev);
1097         kfree(digest);
1098
1099 out:
1100         if (peer_req)
1101                 drbd_free_ee(mdev, peer_req);
1102         dec_unacked(mdev);
1103         return ok;
1104 }
1105
1106 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1107 {
1108         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1109                 mdev->ov_last_oos_size += size>>9;
1110         } else {
1111                 mdev->ov_last_oos_start = sector;
1112                 mdev->ov_last_oos_size = size>>9;
1113         }
1114         drbd_set_out_of_sync(mdev, sector, size);
1115 }
1116
1117 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1118 {
1119         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1120         struct drbd_conf *mdev = w->mdev;
1121         struct digest_info *di;
1122         void *digest;
1123         sector_t sector = peer_req->i.sector;
1124         unsigned int size = peer_req->i.size;
1125         int digest_size;
1126         int ok, eq = 0;
1127
1128         if (unlikely(cancel)) {
1129                 drbd_free_ee(mdev, peer_req);
1130                 dec_unacked(mdev);
1131                 return 1;
1132         }
1133
1134         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1135          * the resync lru has been cleaned up already */
1136         if (get_ldev(mdev)) {
1137                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1138                 put_ldev(mdev);
1139         }
1140
1141         di = peer_req->digest;
1142
1143         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1144                 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1145                 digest = kmalloc(digest_size, GFP_NOIO);
1146                 if (digest) {
1147                         drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1148
1149                         D_ASSERT(digest_size == di->digest_size);
1150                         eq = !memcmp(digest, di->digest, digest_size);
1151                         kfree(digest);
1152                 }
1153         }
1154
1155         /* Free peer_req and pages before send.
1156          * In case we block on congestion, we could otherwise run into
1157          * some distributed deadlock, if the other side blocks on
1158          * congestion as well, because our receiver blocks in
1159          * drbd_pp_alloc due to pp_in_use > max_buffers. */
1160         drbd_free_ee(mdev, peer_req);
1161         if (!eq)
1162                 drbd_ov_oos_found(mdev, sector, size);
1163         else
1164                 ov_oos_print(mdev);
1165
1166         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1167                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1168
1169         dec_unacked(mdev);
1170
1171         --mdev->ov_left;
1172
1173         /* let's advance progress step marks only for every other megabyte */
1174         if ((mdev->ov_left & 0x200) == 0x200)
1175                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1176
1177         if (mdev->ov_left == 0) {
1178                 ov_oos_print(mdev);
1179                 drbd_resync_finished(mdev);
1180         }
1181
1182         return ok;
1183 }
1184
1185 int w_prev_work_done(struct drbd_work *w, int cancel)
1186 {
1187         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1188
1189         complete(&b->done);
1190         return 1;
1191 }
1192
1193 int w_send_barrier(struct drbd_work *w, int cancel)
1194 {
1195         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1196         struct drbd_conf *mdev = w->mdev;
1197         struct p_barrier *p = &mdev->tconn->data.sbuf.barrier;
1198         int ok = 1;
1199
1200         /* really avoid racing with tl_clear.  w.cb may have been referenced
1201          * just before it was reassigned and re-queued, so double check that.
1202          * actually, this race was harmless, since we only try to send the
1203          * barrier packet here, and otherwise do nothing with the object.
1204          * but compare with the head of w_clear_epoch */
1205         spin_lock_irq(&mdev->tconn->req_lock);
1206         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1207                 cancel = 1;
1208         spin_unlock_irq(&mdev->tconn->req_lock);
1209         if (cancel)
1210                 return 1;
1211
1212         if (drbd_get_data_sock(mdev->tconn))
1213                 return 0;
1214         p->barrier = b->br_number;
1215         /* inc_ap_pending was done where this was queued.
1216          * dec_ap_pending will be done in got_BarrierAck
1217          * or (on connection loss) in w_clear_epoch.  */
1218         ok = !_drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BARRIER,
1219                              &p->head, sizeof(*p), 0);
1220         drbd_put_data_sock(mdev->tconn);
1221
1222         return ok;
1223 }
1224
1225 int w_send_write_hint(struct drbd_work *w, int cancel)
1226 {
1227         struct drbd_conf *mdev = w->mdev;
1228         if (cancel)
1229                 return 1;
1230         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1231 }
1232
1233 int w_send_oos(struct drbd_work *w, int cancel)
1234 {
1235         struct drbd_request *req = container_of(w, struct drbd_request, w);
1236         struct drbd_conf *mdev = w->mdev;
1237         int ok;
1238
1239         if (unlikely(cancel)) {
1240                 req_mod(req, SEND_CANCELED);
1241                 return 1;
1242         }
1243
1244         ok = drbd_send_oos(mdev, req);
1245         req_mod(req, OOS_HANDED_TO_NETWORK);
1246
1247         return ok;
1248 }
1249
1250 /**
1251  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1252  * @mdev:       DRBD device.
1253  * @w:          work object.
1254  * @cancel:     The connection will be closed anyways
1255  */
1256 int w_send_dblock(struct drbd_work *w, int cancel)
1257 {
1258         struct drbd_request *req = container_of(w, struct drbd_request, w);
1259         struct drbd_conf *mdev = w->mdev;
1260         int ok;
1261
1262         if (unlikely(cancel)) {
1263                 req_mod(req, SEND_CANCELED);
1264                 return 1;
1265         }
1266
1267         ok = drbd_send_dblock(mdev, req);
1268         req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1269
1270         return ok;
1271 }
1272
1273 /**
1274  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1275  * @mdev:       DRBD device.
1276  * @w:          work object.
1277  * @cancel:     The connection will be closed anyways
1278  */
1279 int w_send_read_req(struct drbd_work *w, int cancel)
1280 {
1281         struct drbd_request *req = container_of(w, struct drbd_request, w);
1282         struct drbd_conf *mdev = w->mdev;
1283         int ok;
1284
1285         if (unlikely(cancel)) {
1286                 req_mod(req, SEND_CANCELED);
1287                 return 1;
1288         }
1289
1290         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1291                                 (unsigned long)req);
1292
1293         req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1294
1295         return ok;
1296 }
1297
1298 int w_restart_disk_io(struct drbd_work *w, int cancel)
1299 {
1300         struct drbd_request *req = container_of(w, struct drbd_request, w);
1301         struct drbd_conf *mdev = w->mdev;
1302
1303         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1304                 drbd_al_begin_io(mdev, req->i.sector);
1305         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1306            theoretically. Practically it can not deadlock, since this is
1307            only used when unfreezing IOs. All the extents of the requests
1308            that made it into the TL are already active */
1309
1310         drbd_req_make_private_bio(req, req->master_bio);
1311         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1312         generic_make_request(req->private_bio);
1313
1314         return 1;
1315 }
1316
1317 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1318 {
1319         struct drbd_conf *odev = mdev;
1320
1321         while (1) {
1322                 if (odev->ldev->dc.resync_after == -1)
1323                         return 1;
1324                 odev = minor_to_mdev(odev->ldev->dc.resync_after);
1325                 if (!expect(odev))
1326                         return 1;
1327                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1328                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1329                     odev->state.aftr_isp || odev->state.peer_isp ||
1330                     odev->state.user_isp)
1331                         return 0;
1332         }
1333 }
1334
1335 /**
1336  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1337  * @mdev:       DRBD device.
1338  *
1339  * Called from process context only (admin command and after_state_ch).
1340  */
1341 static int _drbd_pause_after(struct drbd_conf *mdev)
1342 {
1343         struct drbd_conf *odev;
1344         int i, rv = 0;
1345
1346         idr_for_each_entry(&minors, odev, i) {
1347                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1348                         continue;
1349                 if (!_drbd_may_sync_now(odev))
1350                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1351                                != SS_NOTHING_TO_DO);
1352         }
1353
1354         return rv;
1355 }
1356
1357 /**
1358  * _drbd_resume_next() - Resume resync on all devices that may resync now
1359  * @mdev:       DRBD device.
1360  *
1361  * Called from process context only (admin command and worker).
1362  */
1363 static int _drbd_resume_next(struct drbd_conf *mdev)
1364 {
1365         struct drbd_conf *odev;
1366         int i, rv = 0;
1367
1368         idr_for_each_entry(&minors, odev, i) {
1369                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1370                         continue;
1371                 if (odev->state.aftr_isp) {
1372                         if (_drbd_may_sync_now(odev))
1373                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1374                                                         CS_HARD, NULL)
1375                                        != SS_NOTHING_TO_DO) ;
1376                 }
1377         }
1378         return rv;
1379 }
1380
1381 void resume_next_sg(struct drbd_conf *mdev)
1382 {
1383         write_lock_irq(&global_state_lock);
1384         _drbd_resume_next(mdev);
1385         write_unlock_irq(&global_state_lock);
1386 }
1387
1388 void suspend_other_sg(struct drbd_conf *mdev)
1389 {
1390         write_lock_irq(&global_state_lock);
1391         _drbd_pause_after(mdev);
1392         write_unlock_irq(&global_state_lock);
1393 }
1394
1395 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1396 {
1397         struct drbd_conf *odev;
1398
1399         if (o_minor == -1)
1400                 return NO_ERROR;
1401         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1402                 return ERR_SYNC_AFTER;
1403
1404         /* check for loops */
1405         odev = minor_to_mdev(o_minor);
1406         while (1) {
1407                 if (odev == mdev)
1408                         return ERR_SYNC_AFTER_CYCLE;
1409
1410                 /* dependency chain ends here, no cycles. */
1411                 if (odev->ldev->dc.resync_after == -1)
1412                         return NO_ERROR;
1413
1414                 /* follow the dependency chain */
1415                 odev = minor_to_mdev(odev->ldev->dc.resync_after);
1416         }
1417 }
1418
1419 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1420 {
1421         int changes;
1422         int retcode;
1423
1424         write_lock_irq(&global_state_lock);
1425         retcode = sync_after_error(mdev, na);
1426         if (retcode == NO_ERROR) {
1427                 mdev->ldev->dc.resync_after = na;
1428                 do {
1429                         changes  = _drbd_pause_after(mdev);
1430                         changes |= _drbd_resume_next(mdev);
1431                 } while (changes);
1432         }
1433         write_unlock_irq(&global_state_lock);
1434         return retcode;
1435 }
1436
1437 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1438 {
1439         atomic_set(&mdev->rs_sect_in, 0);
1440         atomic_set(&mdev->rs_sect_ev, 0);
1441         mdev->rs_in_flight = 0;
1442         mdev->rs_planed = 0;
1443         spin_lock(&mdev->peer_seq_lock);
1444         fifo_set(&mdev->rs_plan_s, 0);
1445         spin_unlock(&mdev->peer_seq_lock);
1446 }
1447
1448 void start_resync_timer_fn(unsigned long data)
1449 {
1450         struct drbd_conf *mdev = (struct drbd_conf *) data;
1451
1452         drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1453 }
1454
1455 int w_start_resync(struct drbd_work *w, int cancel)
1456 {
1457         struct drbd_conf *mdev = w->mdev;
1458
1459         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1460                 dev_warn(DEV, "w_start_resync later...\n");
1461                 mdev->start_resync_timer.expires = jiffies + HZ/10;
1462                 add_timer(&mdev->start_resync_timer);
1463                 return 1;
1464         }
1465
1466         drbd_start_resync(mdev, C_SYNC_SOURCE);
1467         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1468         return 1;
1469 }
1470
1471 /**
1472  * drbd_start_resync() - Start the resync process
1473  * @mdev:       DRBD device.
1474  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1475  *
1476  * This function might bring you directly into one of the
1477  * C_PAUSED_SYNC_* states.
1478  */
1479 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1480 {
1481         union drbd_state ns;
1482         int r;
1483
1484         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1485                 dev_err(DEV, "Resync already running!\n");
1486                 return;
1487         }
1488
1489         if (mdev->state.conn < C_AHEAD) {
1490                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1491                 drbd_rs_cancel_all(mdev);
1492                 /* This should be done when we abort the resync. We definitely do not
1493                    want to have this for connections going back and forth between
1494                    Ahead/Behind and SyncSource/SyncTarget */
1495         }
1496
1497         if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1498                 if (side == C_SYNC_TARGET) {
1499                         /* Since application IO was locked out during C_WF_BITMAP_T and
1500                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1501                            we check that we might make the data inconsistent. */
1502                         r = drbd_khelper(mdev, "before-resync-target");
1503                         r = (r >> 8) & 0xff;
1504                         if (r > 0) {
1505                                 dev_info(DEV, "before-resync-target handler returned %d, "
1506                                          "dropping connection.\n", r);
1507                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1508                                 return;
1509                         }
1510                 } else /* C_SYNC_SOURCE */ {
1511                         r = drbd_khelper(mdev, "before-resync-source");
1512                         r = (r >> 8) & 0xff;
1513                         if (r > 0) {
1514                                 if (r == 3) {
1515                                         dev_info(DEV, "before-resync-source handler returned %d, "
1516                                                  "ignoring. Old userland tools?", r);
1517                                 } else {
1518                                         dev_info(DEV, "before-resync-source handler returned %d, "
1519                                                  "dropping connection.\n", r);
1520                                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1521                                         return;
1522                                 }
1523                         }
1524                 }
1525         }
1526
1527         if (current == mdev->tconn->worker.task) {
1528                 /* The worker should not sleep waiting for state_mutex,
1529                    that can take long */
1530                 if (!mutex_trylock(mdev->state_mutex)) {
1531                         set_bit(B_RS_H_DONE, &mdev->flags);
1532                         mdev->start_resync_timer.expires = jiffies + HZ/5;
1533                         add_timer(&mdev->start_resync_timer);
1534                         return;
1535                 }
1536         } else {
1537                 mutex_lock(mdev->state_mutex);
1538         }
1539         clear_bit(B_RS_H_DONE, &mdev->flags);
1540
1541         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1542                 mutex_unlock(mdev->state_mutex);
1543                 return;
1544         }
1545
1546         write_lock_irq(&global_state_lock);
1547         ns = mdev->state;
1548
1549         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1550
1551         ns.conn = side;
1552
1553         if (side == C_SYNC_TARGET)
1554                 ns.disk = D_INCONSISTENT;
1555         else /* side == C_SYNC_SOURCE */
1556                 ns.pdsk = D_INCONSISTENT;
1557
1558         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1559         ns = mdev->state;
1560
1561         if (ns.conn < C_CONNECTED)
1562                 r = SS_UNKNOWN_ERROR;
1563
1564         if (r == SS_SUCCESS) {
1565                 unsigned long tw = drbd_bm_total_weight(mdev);
1566                 unsigned long now = jiffies;
1567                 int i;
1568
1569                 mdev->rs_failed    = 0;
1570                 mdev->rs_paused    = 0;
1571                 mdev->rs_same_csum = 0;
1572                 mdev->rs_last_events = 0;
1573                 mdev->rs_last_sect_ev = 0;
1574                 mdev->rs_total     = tw;
1575                 mdev->rs_start     = now;
1576                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1577                         mdev->rs_mark_left[i] = tw;
1578                         mdev->rs_mark_time[i] = now;
1579                 }
1580                 _drbd_pause_after(mdev);
1581         }
1582         write_unlock_irq(&global_state_lock);
1583
1584         if (r == SS_SUCCESS) {
1585                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1586                      drbd_conn_str(ns.conn),
1587                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1588                      (unsigned long) mdev->rs_total);
1589                 if (side == C_SYNC_TARGET)
1590                         mdev->bm_resync_fo = 0;
1591
1592                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1593                  * with w_send_oos, or the sync target will get confused as to
1594                  * how much bits to resync.  We cannot do that always, because for an
1595                  * empty resync and protocol < 95, we need to do it here, as we call
1596                  * drbd_resync_finished from here in that case.
1597                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1598                  * and from after_state_ch otherwise. */
1599                 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1600                         drbd_gen_and_send_sync_uuid(mdev);
1601
1602                 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1603                         /* This still has a race (about when exactly the peers
1604                          * detect connection loss) that can lead to a full sync
1605                          * on next handshake. In 8.3.9 we fixed this with explicit
1606                          * resync-finished notifications, but the fix
1607                          * introduces a protocol change.  Sleeping for some
1608                          * time longer than the ping interval + timeout on the
1609                          * SyncSource, to give the SyncTarget the chance to
1610                          * detect connection loss, then waiting for a ping
1611                          * response (implicit in drbd_resync_finished) reduces
1612                          * the race considerably, but does not solve it. */
1613                         if (side == C_SYNC_SOURCE)
1614                                 schedule_timeout_interruptible(
1615                                         mdev->tconn->net_conf->ping_int * HZ +
1616                                         mdev->tconn->net_conf->ping_timeo*HZ/9);
1617                         drbd_resync_finished(mdev);
1618                 }
1619
1620                 drbd_rs_controller_reset(mdev);
1621                 /* ns.conn may already be != mdev->state.conn,
1622                  * we may have been paused in between, or become paused until
1623                  * the timer triggers.
1624                  * No matter, that is handled in resync_timer_fn() */
1625                 if (ns.conn == C_SYNC_TARGET)
1626                         mod_timer(&mdev->resync_timer, jiffies);
1627
1628                 drbd_md_sync(mdev);
1629         }
1630         put_ldev(mdev);
1631         mutex_unlock(mdev->state_mutex);
1632 }
1633
1634 int drbd_worker(struct drbd_thread *thi)
1635 {
1636         struct drbd_tconn *tconn = thi->tconn;
1637         struct drbd_work *w = NULL;
1638         struct drbd_conf *mdev;
1639         LIST_HEAD(work_list);
1640         int vnr, intr = 0;
1641
1642         while (get_t_state(thi) == RUNNING) {
1643                 drbd_thread_current_set_cpu(thi);
1644
1645                 if (down_trylock(&tconn->data.work.s)) {
1646                         mutex_lock(&tconn->data.mutex);
1647                         if (tconn->data.socket && !tconn->net_conf->no_cork)
1648                                 drbd_tcp_uncork(tconn->data.socket);
1649                         mutex_unlock(&tconn->data.mutex);
1650
1651                         intr = down_interruptible(&tconn->data.work.s);
1652
1653                         mutex_lock(&tconn->data.mutex);
1654                         if (tconn->data.socket  && !tconn->net_conf->no_cork)
1655                                 drbd_tcp_cork(tconn->data.socket);
1656                         mutex_unlock(&tconn->data.mutex);
1657                 }
1658
1659                 if (intr) {
1660                         flush_signals(current);
1661                         if (get_t_state(thi) == RUNNING) {
1662                                 conn_warn(tconn, "Worker got an unexpected signal\n");
1663                                 continue;
1664                         }
1665                         break;
1666                 }
1667
1668                 if (get_t_state(thi) != RUNNING)
1669                         break;
1670                 /* With this break, we have done a down() but not consumed
1671                    the entry from the list. The cleanup code takes care of
1672                    this...   */
1673
1674                 w = NULL;
1675                 spin_lock_irq(&tconn->data.work.q_lock);
1676                 if (list_empty(&tconn->data.work.q)) {
1677                         /* something terribly wrong in our logic.
1678                          * we were able to down() the semaphore,
1679                          * but the list is empty... doh.
1680                          *
1681                          * what is the best thing to do now?
1682                          * try again from scratch, restarting the receiver,
1683                          * asender, whatnot? could break even more ugly,
1684                          * e.g. when we are primary, but no good local data.
1685                          *
1686                          * I'll try to get away just starting over this loop.
1687                          */
1688                         conn_warn(tconn, "Work list unexpectedly empty\n");
1689                         spin_unlock_irq(&tconn->data.work.q_lock);
1690                         continue;
1691                 }
1692                 w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1693                 list_del_init(&w->list);
1694                 spin_unlock_irq(&tconn->data.work.q_lock);
1695
1696                 if (!w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1697                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1698                         if (tconn->cstate >= C_WF_REPORT_PARAMS)
1699                                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1700                 }
1701         }
1702
1703         spin_lock_irq(&tconn->data.work.q_lock);
1704         while (!list_empty(&tconn->data.work.q)) {
1705                 list_splice_init(&tconn->data.work.q, &work_list);
1706                 spin_unlock_irq(&tconn->data.work.q_lock);
1707
1708                 while (!list_empty(&work_list)) {
1709                         w = list_entry(work_list.next, struct drbd_work, list);
1710                         list_del_init(&w->list);
1711                         w->cb(w, 1);
1712                 }
1713
1714                 spin_lock_irq(&tconn->data.work.q_lock);
1715         }
1716         sema_init(&tconn->data.work.s, 0);
1717         /* DANGEROUS race: if someone did queue his work within the spinlock,
1718          * but up() ed outside the spinlock, we could get an up() on the
1719          * semaphore without corresponding list entry.
1720          * So don't do that.
1721          */
1722         spin_unlock_irq(&tconn->data.work.q_lock);
1723
1724         drbd_thread_stop(&tconn->receiver);
1725         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1726                 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1727                 /* _drbd_set_state only uses stop_nowait.
1728                  * wait here for the exiting receiver. */
1729                 drbd_mdev_cleanup(mdev);
1730         }
1731         clear_bit(OBJECT_DYING, &tconn->flags);
1732         clear_bit(CONFIG_PENDING, &tconn->flags);
1733         wake_up(&tconn->ping_wait);
1734
1735         return 0;
1736 }