]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_worker.c
drbd: Temporarily change the return type of all worker callbacks
[karo-tx-linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42
43
44 /* endio handlers:
45  *   drbd_md_io_complete (defined here)
46  *   drbd_request_endio (defined here)
47  *   drbd_peer_request_endio (defined here)
48  *   bm_async_io_complete (defined in drbd_bitmap.c)
49  *
50  * For all these callbacks, note the following:
51  * The callbacks will be called in irq context by the IDE drivers,
52  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53  * Try to get the locking right :)
54  *
55  */
56
57
58 /* About the global_state_lock
59    Each state transition on an device holds a read lock. In case we have
60    to evaluate the sync after dependencies, we grab a write lock, because
61    we need stable states on all devices for that.  */
62 rwlock_t global_state_lock;
63
64 /* used for synchronous meta data and bitmap IO
65  * submitted by drbd_md_sync_page_io()
66  */
67 void drbd_md_io_complete(struct bio *bio, int error)
68 {
69         struct drbd_md_io *md_io;
70
71         md_io = (struct drbd_md_io *)bio->bi_private;
72         md_io->error = error;
73
74         complete(&md_io->event);
75 }
76
77 /* reads on behalf of the partner,
78  * "submitted" by the receiver
79  */
80 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
81 {
82         unsigned long flags = 0;
83         struct drbd_conf *mdev = peer_req->w.mdev;
84
85         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
86         mdev->read_cnt += peer_req->i.size >> 9;
87         list_del(&peer_req->w.list);
88         if (list_empty(&mdev->read_ee))
89                 wake_up(&mdev->ee_wait);
90         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
91                 __drbd_chk_io_error(mdev, false);
92         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
93
94         drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
95         put_ldev(mdev);
96 }
97
98 /* writes on behalf of the partner, or resync writes,
99  * "submitted" by the receiver, final stage.  */
100 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
101 {
102         unsigned long flags = 0;
103         struct drbd_conf *mdev = peer_req->w.mdev;
104         sector_t e_sector;
105         int do_wake;
106         u64 block_id;
107         int do_al_complete_io;
108
109         /* after we moved peer_req to done_ee,
110          * we may no longer access it,
111          * it may be freed/reused already!
112          * (as soon as we release the req_lock) */
113         e_sector = peer_req->i.sector;
114         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
115         block_id = peer_req->block_id;
116
117         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
118         mdev->writ_cnt += peer_req->i.size >> 9;
119         list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
120         list_add_tail(&peer_req->w.list, &mdev->done_ee);
121
122         /*
123          * Do not remove from the write_requests tree here: we did not send the
124          * Ack yet and did not wake possibly waiting conflicting requests.
125          * Removed from the tree from "drbd_process_done_ee" within the
126          * appropriate w.cb (e_end_block/e_end_resync_block) or from
127          * _drbd_clear_done_ee.
128          */
129
130         do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
131
132         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
133                 __drbd_chk_io_error(mdev, false);
134         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
135
136         if (block_id == ID_SYNCER)
137                 drbd_rs_complete_io(mdev, e_sector);
138
139         if (do_wake)
140                 wake_up(&mdev->ee_wait);
141
142         if (do_al_complete_io)
143                 drbd_al_complete_io(mdev, e_sector);
144
145         wake_asender(mdev->tconn);
146         put_ldev(mdev);
147 }
148
149 /* writes on behalf of the partner, or resync writes,
150  * "submitted" by the receiver.
151  */
152 void drbd_peer_request_endio(struct bio *bio, int error)
153 {
154         struct drbd_peer_request *peer_req = bio->bi_private;
155         struct drbd_conf *mdev = peer_req->w.mdev;
156         int uptodate = bio_flagged(bio, BIO_UPTODATE);
157         int is_write = bio_data_dir(bio) == WRITE;
158
159         if (error && __ratelimit(&drbd_ratelimit_state))
160                 dev_warn(DEV, "%s: error=%d s=%llus\n",
161                                 is_write ? "write" : "read", error,
162                                 (unsigned long long)peer_req->i.sector);
163         if (!error && !uptodate) {
164                 if (__ratelimit(&drbd_ratelimit_state))
165                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
166                                         is_write ? "write" : "read",
167                                         (unsigned long long)peer_req->i.sector);
168                 /* strange behavior of some lower level drivers...
169                  * fail the request by clearing the uptodate flag,
170                  * but do not return any error?! */
171                 error = -EIO;
172         }
173
174         if (error)
175                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
176
177         bio_put(bio); /* no need for the bio anymore */
178         if (atomic_dec_and_test(&peer_req->pending_bios)) {
179                 if (is_write)
180                         drbd_endio_write_sec_final(peer_req);
181                 else
182                         drbd_endio_read_sec_final(peer_req);
183         }
184 }
185
186 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
187  */
188 void drbd_request_endio(struct bio *bio, int error)
189 {
190         unsigned long flags;
191         struct drbd_request *req = bio->bi_private;
192         struct drbd_conf *mdev = req->w.mdev;
193         struct bio_and_error m;
194         enum drbd_req_event what;
195         int uptodate = bio_flagged(bio, BIO_UPTODATE);
196
197         if (!error && !uptodate) {
198                 dev_warn(DEV, "p %s: setting error to -EIO\n",
199                          bio_data_dir(bio) == WRITE ? "write" : "read");
200                 /* strange behavior of some lower level drivers...
201                  * fail the request by clearing the uptodate flag,
202                  * but do not return any error?! */
203                 error = -EIO;
204         }
205
206         /* to avoid recursion in __req_mod */
207         if (unlikely(error)) {
208                 what = (bio_data_dir(bio) == WRITE)
209                         ? WRITE_COMPLETED_WITH_ERROR
210                         : (bio_rw(bio) == READ)
211                           ? READ_COMPLETED_WITH_ERROR
212                           : READ_AHEAD_COMPLETED_WITH_ERROR;
213         } else
214                 what = COMPLETED_OK;
215
216         bio_put(req->private_bio);
217         req->private_bio = ERR_PTR(error);
218
219         /* not req_mod(), we need irqsave here! */
220         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
221         __req_mod(req, what, &m);
222         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
223
224         if (m.bio)
225                 complete_master_bio(mdev, &m);
226 }
227
228 long w_read_retry_remote(struct drbd_work *w, int cancel)
229 {
230         struct drbd_request *req = container_of(w, struct drbd_request, w);
231         struct drbd_conf *mdev = w->mdev;
232
233         /* We should not detach for read io-error,
234          * but try to WRITE the P_DATA_REPLY to the failed location,
235          * to give the disk the chance to relocate that block */
236
237         spin_lock_irq(&mdev->tconn->req_lock);
238         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
239                 _req_mod(req, READ_RETRY_REMOTE_CANCELED);
240                 spin_unlock_irq(&mdev->tconn->req_lock);
241                 return 1;
242         }
243         spin_unlock_irq(&mdev->tconn->req_lock);
244
245         return w_send_read_req(w, 0);
246 }
247
248 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
249                   struct drbd_peer_request *peer_req, void *digest)
250 {
251         struct hash_desc desc;
252         struct scatterlist sg;
253         struct page *page = peer_req->pages;
254         struct page *tmp;
255         unsigned len;
256
257         desc.tfm = tfm;
258         desc.flags = 0;
259
260         sg_init_table(&sg, 1);
261         crypto_hash_init(&desc);
262
263         while ((tmp = page_chain_next(page))) {
264                 /* all but the last page will be fully used */
265                 sg_set_page(&sg, page, PAGE_SIZE, 0);
266                 crypto_hash_update(&desc, &sg, sg.length);
267                 page = tmp;
268         }
269         /* and now the last, possibly only partially used page */
270         len = peer_req->i.size & (PAGE_SIZE - 1);
271         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
272         crypto_hash_update(&desc, &sg, sg.length);
273         crypto_hash_final(&desc, digest);
274 }
275
276 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
277 {
278         struct hash_desc desc;
279         struct scatterlist sg;
280         struct bio_vec *bvec;
281         int i;
282
283         desc.tfm = tfm;
284         desc.flags = 0;
285
286         sg_init_table(&sg, 1);
287         crypto_hash_init(&desc);
288
289         __bio_for_each_segment(bvec, bio, i, 0) {
290                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
291                 crypto_hash_update(&desc, &sg, sg.length);
292         }
293         crypto_hash_final(&desc, digest);
294 }
295
296 /* MAYBE merge common code with w_e_end_ov_req */
297 static long w_e_send_csum(struct drbd_work *w, int cancel)
298 {
299         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
300         struct drbd_conf *mdev = w->mdev;
301         int digest_size;
302         void *digest;
303         int ok = 1;
304
305         if (unlikely(cancel))
306                 goto out;
307
308         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
309                 goto out;
310
311         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
312         digest = kmalloc(digest_size, GFP_NOIO);
313         if (digest) {
314                 sector_t sector = peer_req->i.sector;
315                 unsigned int size = peer_req->i.size;
316                 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
317                 /* Free peer_req and pages before send.
318                  * In case we block on congestion, we could otherwise run into
319                  * some distributed deadlock, if the other side blocks on
320                  * congestion as well, because our receiver blocks in
321                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
322                 drbd_free_ee(mdev, peer_req);
323                 peer_req = NULL;
324                 inc_rs_pending(mdev);
325                 ok = !drbd_send_drequest_csum(mdev, sector, size,
326                                               digest, digest_size,
327                                               P_CSUM_RS_REQUEST);
328                 kfree(digest);
329         } else {
330                 dev_err(DEV, "kmalloc() of digest failed.\n");
331                 ok = 0;
332         }
333
334 out:
335         if (peer_req)
336                 drbd_free_ee(mdev, peer_req);
337
338         if (unlikely(!ok))
339                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
340         return ok;
341 }
342
343 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
344
345 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
346 {
347         struct drbd_peer_request *peer_req;
348
349         if (!get_ldev(mdev))
350                 return -EIO;
351
352         if (drbd_rs_should_slow_down(mdev, sector))
353                 goto defer;
354
355         /* GFP_TRY, because if there is no memory available right now, this may
356          * be rescheduled for later. It is "only" background resync, after all. */
357         peer_req = drbd_alloc_ee(mdev, ID_SYNCER /* unused */, sector, size, GFP_TRY);
358         if (!peer_req)
359                 goto defer;
360
361         peer_req->w.cb = w_e_send_csum;
362         spin_lock_irq(&mdev->tconn->req_lock);
363         list_add(&peer_req->w.list, &mdev->read_ee);
364         spin_unlock_irq(&mdev->tconn->req_lock);
365
366         atomic_add(size >> 9, &mdev->rs_sect_ev);
367         if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
368                 return 0;
369
370         /* If it failed because of ENOMEM, retry should help.  If it failed
371          * because bio_add_page failed (probably broken lower level driver),
372          * retry may or may not help.
373          * If it does not, you may need to force disconnect. */
374         spin_lock_irq(&mdev->tconn->req_lock);
375         list_del(&peer_req->w.list);
376         spin_unlock_irq(&mdev->tconn->req_lock);
377
378         drbd_free_ee(mdev, peer_req);
379 defer:
380         put_ldev(mdev);
381         return -EAGAIN;
382 }
383
384 long w_resync_timer(struct drbd_work *w, int cancel)
385 {
386         struct drbd_conf *mdev = w->mdev;
387         switch (mdev->state.conn) {
388         case C_VERIFY_S:
389                 w_make_ov_request(w, cancel);
390                 break;
391         case C_SYNC_TARGET:
392                 w_make_resync_request(w, cancel);
393                 break;
394         }
395
396         return 1;
397 }
398
399 void resync_timer_fn(unsigned long data)
400 {
401         struct drbd_conf *mdev = (struct drbd_conf *) data;
402
403         if (list_empty(&mdev->resync_work.list))
404                 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
405 }
406
407 static void fifo_set(struct fifo_buffer *fb, int value)
408 {
409         int i;
410
411         for (i = 0; i < fb->size; i++)
412                 fb->values[i] = value;
413 }
414
415 static int fifo_push(struct fifo_buffer *fb, int value)
416 {
417         int ov;
418
419         ov = fb->values[fb->head_index];
420         fb->values[fb->head_index++] = value;
421
422         if (fb->head_index >= fb->size)
423                 fb->head_index = 0;
424
425         return ov;
426 }
427
428 static void fifo_add_val(struct fifo_buffer *fb, int value)
429 {
430         int i;
431
432         for (i = 0; i < fb->size; i++)
433                 fb->values[i] += value;
434 }
435
436 static int drbd_rs_controller(struct drbd_conf *mdev)
437 {
438         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
439         unsigned int want;     /* The number of sectors we want in the proxy */
440         int req_sect; /* Number of sectors to request in this turn */
441         int correction; /* Number of sectors more we need in the proxy*/
442         int cps; /* correction per invocation of drbd_rs_controller() */
443         int steps; /* Number of time steps to plan ahead */
444         int curr_corr;
445         int max_sect;
446
447         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
448         mdev->rs_in_flight -= sect_in;
449
450         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
451
452         steps = mdev->rs_plan_s.size; /* (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
453
454         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
455                 want = ((mdev->ldev->dc.resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
456         } else { /* normal path */
457                 want = mdev->ldev->dc.c_fill_target ? mdev->ldev->dc.c_fill_target :
458                         sect_in * mdev->ldev->dc.c_delay_target * HZ / (SLEEP_TIME * 10);
459         }
460
461         correction = want - mdev->rs_in_flight - mdev->rs_planed;
462
463         /* Plan ahead */
464         cps = correction / steps;
465         fifo_add_val(&mdev->rs_plan_s, cps);
466         mdev->rs_planed += cps * steps;
467
468         /* What we do in this step */
469         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
470         spin_unlock(&mdev->peer_seq_lock);
471         mdev->rs_planed -= curr_corr;
472
473         req_sect = sect_in + curr_corr;
474         if (req_sect < 0)
475                 req_sect = 0;
476
477         max_sect = (mdev->ldev->dc.c_max_rate * 2 * SLEEP_TIME) / HZ;
478         if (req_sect > max_sect)
479                 req_sect = max_sect;
480
481         /*
482         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
483                  sect_in, mdev->rs_in_flight, want, correction,
484                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
485         */
486
487         return req_sect;
488 }
489
490 static int drbd_rs_number_requests(struct drbd_conf *mdev)
491 {
492         int number;
493         if (mdev->rs_plan_s.size) { /* mdev->ldev->dc.c_plan_ahead */
494                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
495                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
496         } else {
497                 mdev->c_sync_rate = mdev->ldev->dc.resync_rate;
498                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
499         }
500
501         /* ignore the amount of pending requests, the resync controller should
502          * throttle down to incoming reply rate soon enough anyways. */
503         return number;
504 }
505
506 long w_make_resync_request(struct drbd_work *w, int cancel)
507 {
508         struct drbd_conf *mdev = w->mdev;
509         unsigned long bit;
510         sector_t sector;
511         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
512         int max_bio_size;
513         int number, rollback_i, size;
514         int align, queued, sndbuf;
515         int i = 0;
516
517         if (unlikely(cancel))
518                 return 1;
519
520         if (mdev->rs_total == 0) {
521                 /* empty resync? */
522                 drbd_resync_finished(mdev);
523                 return 1;
524         }
525
526         if (!get_ldev(mdev)) {
527                 /* Since we only need to access mdev->rsync a
528                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
529                    to continue resync with a broken disk makes no sense at
530                    all */
531                 dev_err(DEV, "Disk broke down during resync!\n");
532                 return 1;
533         }
534
535         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
536         number = drbd_rs_number_requests(mdev);
537         if (number == 0)
538                 goto requeue;
539
540         for (i = 0; i < number; i++) {
541                 /* Stop generating RS requests, when half of the send buffer is filled */
542                 mutex_lock(&mdev->tconn->data.mutex);
543                 if (mdev->tconn->data.socket) {
544                         queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
545                         sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
546                 } else {
547                         queued = 1;
548                         sndbuf = 0;
549                 }
550                 mutex_unlock(&mdev->tconn->data.mutex);
551                 if (queued > sndbuf / 2)
552                         goto requeue;
553
554 next_sector:
555                 size = BM_BLOCK_SIZE;
556                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
557
558                 if (bit == DRBD_END_OF_BITMAP) {
559                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
560                         put_ldev(mdev);
561                         return 1;
562                 }
563
564                 sector = BM_BIT_TO_SECT(bit);
565
566                 if (drbd_rs_should_slow_down(mdev, sector) ||
567                     drbd_try_rs_begin_io(mdev, sector)) {
568                         mdev->bm_resync_fo = bit;
569                         goto requeue;
570                 }
571                 mdev->bm_resync_fo = bit + 1;
572
573                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
574                         drbd_rs_complete_io(mdev, sector);
575                         goto next_sector;
576                 }
577
578 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
579                 /* try to find some adjacent bits.
580                  * we stop if we have already the maximum req size.
581                  *
582                  * Additionally always align bigger requests, in order to
583                  * be prepared for all stripe sizes of software RAIDs.
584                  */
585                 align = 1;
586                 rollback_i = i;
587                 for (;;) {
588                         if (size + BM_BLOCK_SIZE > max_bio_size)
589                                 break;
590
591                         /* Be always aligned */
592                         if (sector & ((1<<(align+3))-1))
593                                 break;
594
595                         /* do not cross extent boundaries */
596                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
597                                 break;
598                         /* now, is it actually dirty, after all?
599                          * caution, drbd_bm_test_bit is tri-state for some
600                          * obscure reason; ( b == 0 ) would get the out-of-band
601                          * only accidentally right because of the "oddly sized"
602                          * adjustment below */
603                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
604                                 break;
605                         bit++;
606                         size += BM_BLOCK_SIZE;
607                         if ((BM_BLOCK_SIZE << align) <= size)
608                                 align++;
609                         i++;
610                 }
611                 /* if we merged some,
612                  * reset the offset to start the next drbd_bm_find_next from */
613                 if (size > BM_BLOCK_SIZE)
614                         mdev->bm_resync_fo = bit + 1;
615 #endif
616
617                 /* adjust very last sectors, in case we are oddly sized */
618                 if (sector + (size>>9) > capacity)
619                         size = (capacity-sector)<<9;
620                 if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
621                         switch (read_for_csum(mdev, sector, size)) {
622                         case -EIO: /* Disk failure */
623                                 put_ldev(mdev);
624                                 return 0;
625                         case -EAGAIN: /* allocation failed, or ldev busy */
626                                 drbd_rs_complete_io(mdev, sector);
627                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
628                                 i = rollback_i;
629                                 goto requeue;
630                         case 0:
631                                 /* everything ok */
632                                 break;
633                         default:
634                                 BUG();
635                         }
636                 } else {
637                         inc_rs_pending(mdev);
638                         if (drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
639                                                sector, size, ID_SYNCER)) {
640                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
641                                 dec_rs_pending(mdev);
642                                 put_ldev(mdev);
643                                 return 0;
644                         }
645                 }
646         }
647
648         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
649                 /* last syncer _request_ was sent,
650                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
651                  * next sync group will resume), as soon as we receive the last
652                  * resync data block, and the last bit is cleared.
653                  * until then resync "work" is "inactive" ...
654                  */
655                 put_ldev(mdev);
656                 return 1;
657         }
658
659  requeue:
660         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
661         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
662         put_ldev(mdev);
663         return 1;
664 }
665
666 static int w_make_ov_request(struct drbd_work *w, int cancel)
667 {
668         struct drbd_conf *mdev = w->mdev;
669         int number, i, size;
670         sector_t sector;
671         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
672
673         if (unlikely(cancel))
674                 return 1;
675
676         number = drbd_rs_number_requests(mdev);
677
678         sector = mdev->ov_position;
679         for (i = 0; i < number; i++) {
680                 if (sector >= capacity) {
681                         return 1;
682                 }
683
684                 size = BM_BLOCK_SIZE;
685
686                 if (drbd_rs_should_slow_down(mdev, sector) ||
687                     drbd_try_rs_begin_io(mdev, sector)) {
688                         mdev->ov_position = sector;
689                         goto requeue;
690                 }
691
692                 if (sector + (size>>9) > capacity)
693                         size = (capacity-sector)<<9;
694
695                 inc_rs_pending(mdev);
696                 if (drbd_send_ov_request(mdev, sector, size)) {
697                         dec_rs_pending(mdev);
698                         return 0;
699                 }
700                 sector += BM_SECT_PER_BIT;
701         }
702         mdev->ov_position = sector;
703
704  requeue:
705         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
706         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
707         return 1;
708 }
709
710 long w_ov_finished(struct drbd_work *w, int cancel)
711 {
712         struct drbd_conf *mdev = w->mdev;
713         kfree(w);
714         ov_oos_print(mdev);
715         drbd_resync_finished(mdev);
716
717         return 1;
718 }
719
720 static long w_resync_finished(struct drbd_work *w, int cancel)
721 {
722         struct drbd_conf *mdev = w->mdev;
723         kfree(w);
724
725         drbd_resync_finished(mdev);
726
727         return 1;
728 }
729
730 static void ping_peer(struct drbd_conf *mdev)
731 {
732         struct drbd_tconn *tconn = mdev->tconn;
733
734         clear_bit(GOT_PING_ACK, &tconn->flags);
735         request_ping(tconn);
736         wait_event(tconn->ping_wait,
737                    test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
738 }
739
740 int drbd_resync_finished(struct drbd_conf *mdev)
741 {
742         unsigned long db, dt, dbdt;
743         unsigned long n_oos;
744         union drbd_state os, ns;
745         struct drbd_work *w;
746         char *khelper_cmd = NULL;
747         int verify_done = 0;
748
749         /* Remove all elements from the resync LRU. Since future actions
750          * might set bits in the (main) bitmap, then the entries in the
751          * resync LRU would be wrong. */
752         if (drbd_rs_del_all(mdev)) {
753                 /* In case this is not possible now, most probably because
754                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
755                  * queue (or even the read operations for those packets
756                  * is not finished by now).   Retry in 100ms. */
757
758                 schedule_timeout_interruptible(HZ / 10);
759                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
760                 if (w) {
761                         w->cb = w_resync_finished;
762                         drbd_queue_work(&mdev->tconn->data.work, w);
763                         return 1;
764                 }
765                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
766         }
767
768         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
769         if (dt <= 0)
770                 dt = 1;
771         db = mdev->rs_total;
772         dbdt = Bit2KB(db/dt);
773         mdev->rs_paused /= HZ;
774
775         if (!get_ldev(mdev))
776                 goto out;
777
778         ping_peer(mdev);
779
780         spin_lock_irq(&mdev->tconn->req_lock);
781         os = mdev->state;
782
783         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
784
785         /* This protects us against multiple calls (that can happen in the presence
786            of application IO), and against connectivity loss just before we arrive here. */
787         if (os.conn <= C_CONNECTED)
788                 goto out_unlock;
789
790         ns = os;
791         ns.conn = C_CONNECTED;
792
793         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
794              verify_done ? "Online verify " : "Resync",
795              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
796
797         n_oos = drbd_bm_total_weight(mdev);
798
799         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
800                 if (n_oos) {
801                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
802                               n_oos, Bit2KB(1));
803                         khelper_cmd = "out-of-sync";
804                 }
805         } else {
806                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
807
808                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
809                         khelper_cmd = "after-resync-target";
810
811                 if (mdev->tconn->csums_tfm && mdev->rs_total) {
812                         const unsigned long s = mdev->rs_same_csum;
813                         const unsigned long t = mdev->rs_total;
814                         const int ratio =
815                                 (t == 0)     ? 0 :
816                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
817                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
818                              "transferred %luK total %luK\n",
819                              ratio,
820                              Bit2KB(mdev->rs_same_csum),
821                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
822                              Bit2KB(mdev->rs_total));
823                 }
824         }
825
826         if (mdev->rs_failed) {
827                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
828
829                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
830                         ns.disk = D_INCONSISTENT;
831                         ns.pdsk = D_UP_TO_DATE;
832                 } else {
833                         ns.disk = D_UP_TO_DATE;
834                         ns.pdsk = D_INCONSISTENT;
835                 }
836         } else {
837                 ns.disk = D_UP_TO_DATE;
838                 ns.pdsk = D_UP_TO_DATE;
839
840                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
841                         if (mdev->p_uuid) {
842                                 int i;
843                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
844                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
845                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
846                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
847                         } else {
848                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
849                         }
850                 }
851
852                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
853                         /* for verify runs, we don't update uuids here,
854                          * so there would be nothing to report. */
855                         drbd_uuid_set_bm(mdev, 0UL);
856                         drbd_print_uuids(mdev, "updated UUIDs");
857                         if (mdev->p_uuid) {
858                                 /* Now the two UUID sets are equal, update what we
859                                  * know of the peer. */
860                                 int i;
861                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
862                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
863                         }
864                 }
865         }
866
867         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
868 out_unlock:
869         spin_unlock_irq(&mdev->tconn->req_lock);
870         put_ldev(mdev);
871 out:
872         mdev->rs_total  = 0;
873         mdev->rs_failed = 0;
874         mdev->rs_paused = 0;
875         if (verify_done)
876                 mdev->ov_start_sector = 0;
877
878         drbd_md_sync(mdev);
879
880         if (khelper_cmd)
881                 drbd_khelper(mdev, khelper_cmd);
882
883         return 1;
884 }
885
886 /* helper */
887 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
888 {
889         if (drbd_ee_has_active_page(peer_req)) {
890                 /* This might happen if sendpage() has not finished */
891                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
892                 atomic_add(i, &mdev->pp_in_use_by_net);
893                 atomic_sub(i, &mdev->pp_in_use);
894                 spin_lock_irq(&mdev->tconn->req_lock);
895                 list_add_tail(&peer_req->w.list, &mdev->net_ee);
896                 spin_unlock_irq(&mdev->tconn->req_lock);
897                 wake_up(&drbd_pp_wait);
898         } else
899                 drbd_free_ee(mdev, peer_req);
900 }
901
902 /**
903  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
904  * @mdev:       DRBD device.
905  * @w:          work object.
906  * @cancel:     The connection will be closed anyways
907  */
908 long w_e_end_data_req(struct drbd_work *w, int cancel)
909 {
910         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
911         struct drbd_conf *mdev = w->mdev;
912         int ok;
913
914         if (unlikely(cancel)) {
915                 drbd_free_ee(mdev, peer_req);
916                 dec_unacked(mdev);
917                 return 1;
918         }
919
920         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
921                 ok = !drbd_send_block(mdev, P_DATA_REPLY, peer_req);
922         } else {
923                 if (__ratelimit(&drbd_ratelimit_state))
924                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
925                             (unsigned long long)peer_req->i.sector);
926
927                 ok = !drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
928         }
929
930         dec_unacked(mdev);
931
932         move_to_net_ee_or_free(mdev, peer_req);
933
934         if (unlikely(!ok))
935                 dev_err(DEV, "drbd_send_block() failed\n");
936         return ok;
937 }
938
939 /**
940  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
941  * @mdev:       DRBD device.
942  * @w:          work object.
943  * @cancel:     The connection will be closed anyways
944  */
945 long w_e_end_rsdata_req(struct drbd_work *w, int cancel)
946 {
947         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
948         struct drbd_conf *mdev = w->mdev;
949         int ok;
950
951         if (unlikely(cancel)) {
952                 drbd_free_ee(mdev, peer_req);
953                 dec_unacked(mdev);
954                 return 1;
955         }
956
957         if (get_ldev_if_state(mdev, D_FAILED)) {
958                 drbd_rs_complete_io(mdev, peer_req->i.sector);
959                 put_ldev(mdev);
960         }
961
962         if (mdev->state.conn == C_AHEAD) {
963                 ok = !drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
964         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
965                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
966                         inc_rs_pending(mdev);
967                         ok = !drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
968                 } else {
969                         if (__ratelimit(&drbd_ratelimit_state))
970                                 dev_err(DEV, "Not sending RSDataReply, "
971                                     "partner DISKLESS!\n");
972                         ok = 1;
973                 }
974         } else {
975                 if (__ratelimit(&drbd_ratelimit_state))
976                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
977                             (unsigned long long)peer_req->i.sector);
978
979                 ok = !drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
980
981                 /* update resync data with failure */
982                 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
983         }
984
985         dec_unacked(mdev);
986
987         move_to_net_ee_or_free(mdev, peer_req);
988
989         if (unlikely(!ok))
990                 dev_err(DEV, "drbd_send_block() failed\n");
991         return ok;
992 }
993
994 long w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
995 {
996         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
997         struct drbd_conf *mdev = w->mdev;
998         struct digest_info *di;
999         int digest_size;
1000         void *digest = NULL;
1001         int ok, eq = 0;
1002
1003         if (unlikely(cancel)) {
1004                 drbd_free_ee(mdev, peer_req);
1005                 dec_unacked(mdev);
1006                 return 1;
1007         }
1008
1009         if (get_ldev(mdev)) {
1010                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1011                 put_ldev(mdev);
1012         }
1013
1014         di = peer_req->digest;
1015
1016         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1017                 /* quick hack to try to avoid a race against reconfiguration.
1018                  * a real fix would be much more involved,
1019                  * introducing more locking mechanisms */
1020                 if (mdev->tconn->csums_tfm) {
1021                         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1022                         D_ASSERT(digest_size == di->digest_size);
1023                         digest = kmalloc(digest_size, GFP_NOIO);
1024                 }
1025                 if (digest) {
1026                         drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1027                         eq = !memcmp(digest, di->digest, digest_size);
1028                         kfree(digest);
1029                 }
1030
1031                 if (eq) {
1032                         drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1033                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1034                         mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1035                         ok = !drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1036                 } else {
1037                         inc_rs_pending(mdev);
1038                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1039                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1040                         kfree(di);
1041                         ok = !drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1042                 }
1043         } else {
1044                 ok = !drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1045                 if (__ratelimit(&drbd_ratelimit_state))
1046                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1047         }
1048
1049         dec_unacked(mdev);
1050         move_to_net_ee_or_free(mdev, peer_req);
1051
1052         if (unlikely(!ok))
1053                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1054         return ok;
1055 }
1056
1057 long w_e_end_ov_req(struct drbd_work *w, int cancel)
1058 {
1059         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1060         struct drbd_conf *mdev = w->mdev;
1061         sector_t sector = peer_req->i.sector;
1062         unsigned int size = peer_req->i.size;
1063         int digest_size;
1064         void *digest;
1065         int ok = 1;
1066
1067         if (unlikely(cancel))
1068                 goto out;
1069
1070         digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1071         digest = kmalloc(digest_size, GFP_NOIO);
1072         if (!digest) {
1073                 ok = 0; /* terminate the connection in case the allocation failed */
1074                 goto out;
1075         }
1076
1077         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1078                 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1079         else
1080                 memset(digest, 0, digest_size);
1081
1082         /* Free e and pages before send.
1083          * In case we block on congestion, we could otherwise run into
1084          * some distributed deadlock, if the other side blocks on
1085          * congestion as well, because our receiver blocks in
1086          * drbd_pp_alloc due to pp_in_use > max_buffers. */
1087         drbd_free_ee(mdev, peer_req);
1088         peer_req = NULL;
1089         inc_rs_pending(mdev);
1090         ok = !drbd_send_drequest_csum(mdev, sector, size,
1091                                       digest, digest_size,
1092                                       P_OV_REPLY);
1093         if (!ok)
1094                 dec_rs_pending(mdev);
1095         kfree(digest);
1096
1097 out:
1098         if (peer_req)
1099                 drbd_free_ee(mdev, peer_req);
1100         dec_unacked(mdev);
1101         return ok;
1102 }
1103
1104 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1105 {
1106         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1107                 mdev->ov_last_oos_size += size>>9;
1108         } else {
1109                 mdev->ov_last_oos_start = sector;
1110                 mdev->ov_last_oos_size = size>>9;
1111         }
1112         drbd_set_out_of_sync(mdev, sector, size);
1113 }
1114
1115 long w_e_end_ov_reply(struct drbd_work *w, int cancel)
1116 {
1117         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1118         struct drbd_conf *mdev = w->mdev;
1119         struct digest_info *di;
1120         void *digest;
1121         sector_t sector = peer_req->i.sector;
1122         unsigned int size = peer_req->i.size;
1123         int digest_size;
1124         int ok, eq = 0;
1125
1126         if (unlikely(cancel)) {
1127                 drbd_free_ee(mdev, peer_req);
1128                 dec_unacked(mdev);
1129                 return 1;
1130         }
1131
1132         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1133          * the resync lru has been cleaned up already */
1134         if (get_ldev(mdev)) {
1135                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1136                 put_ldev(mdev);
1137         }
1138
1139         di = peer_req->digest;
1140
1141         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1142                 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1143                 digest = kmalloc(digest_size, GFP_NOIO);
1144                 if (digest) {
1145                         drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1146
1147                         D_ASSERT(digest_size == di->digest_size);
1148                         eq = !memcmp(digest, di->digest, digest_size);
1149                         kfree(digest);
1150                 }
1151         }
1152
1153         /* Free peer_req and pages before send.
1154          * In case we block on congestion, we could otherwise run into
1155          * some distributed deadlock, if the other side blocks on
1156          * congestion as well, because our receiver blocks in
1157          * drbd_pp_alloc due to pp_in_use > max_buffers. */
1158         drbd_free_ee(mdev, peer_req);
1159         if (!eq)
1160                 drbd_ov_oos_found(mdev, sector, size);
1161         else
1162                 ov_oos_print(mdev);
1163
1164         ok = !drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1165                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1166
1167         dec_unacked(mdev);
1168
1169         --mdev->ov_left;
1170
1171         /* let's advance progress step marks only for every other megabyte */
1172         if ((mdev->ov_left & 0x200) == 0x200)
1173                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1174
1175         if (mdev->ov_left == 0) {
1176                 ov_oos_print(mdev);
1177                 drbd_resync_finished(mdev);
1178         }
1179
1180         return ok;
1181 }
1182
1183 long w_prev_work_done(struct drbd_work *w, int cancel)
1184 {
1185         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1186
1187         complete(&b->done);
1188         return 1;
1189 }
1190
1191 long w_send_barrier(struct drbd_work *w, int cancel)
1192 {
1193         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1194         struct drbd_conf *mdev = w->mdev;
1195         struct p_barrier *p = &mdev->tconn->data.sbuf.barrier;
1196         int ok = 1;
1197
1198         /* really avoid racing with tl_clear.  w.cb may have been referenced
1199          * just before it was reassigned and re-queued, so double check that.
1200          * actually, this race was harmless, since we only try to send the
1201          * barrier packet here, and otherwise do nothing with the object.
1202          * but compare with the head of w_clear_epoch */
1203         spin_lock_irq(&mdev->tconn->req_lock);
1204         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1205                 cancel = 1;
1206         spin_unlock_irq(&mdev->tconn->req_lock);
1207         if (cancel)
1208                 return 1;
1209
1210         if (drbd_get_data_sock(mdev->tconn))
1211                 return 0;
1212         p->barrier = b->br_number;
1213         /* inc_ap_pending was done where this was queued.
1214          * dec_ap_pending will be done in got_BarrierAck
1215          * or (on connection loss) in w_clear_epoch.  */
1216         ok = !_drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BARRIER,
1217                              &p->head, sizeof(*p), 0);
1218         drbd_put_data_sock(mdev->tconn);
1219
1220         return ok;
1221 }
1222
1223 long w_send_write_hint(struct drbd_work *w, int cancel)
1224 {
1225         struct drbd_conf *mdev = w->mdev;
1226         if (cancel)
1227                 return 1;
1228         return !drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1229 }
1230
1231 long w_send_oos(struct drbd_work *w, int cancel)
1232 {
1233         struct drbd_request *req = container_of(w, struct drbd_request, w);
1234         struct drbd_conf *mdev = w->mdev;
1235         int ok;
1236
1237         if (unlikely(cancel)) {
1238                 req_mod(req, SEND_CANCELED);
1239                 return 1;
1240         }
1241
1242         ok = !drbd_send_oos(mdev, req);
1243         req_mod(req, OOS_HANDED_TO_NETWORK);
1244
1245         return ok;
1246 }
1247
1248 /**
1249  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1250  * @mdev:       DRBD device.
1251  * @w:          work object.
1252  * @cancel:     The connection will be closed anyways
1253  */
1254 long w_send_dblock(struct drbd_work *w, int cancel)
1255 {
1256         struct drbd_request *req = container_of(w, struct drbd_request, w);
1257         struct drbd_conf *mdev = w->mdev;
1258         int ok;
1259
1260         if (unlikely(cancel)) {
1261                 req_mod(req, SEND_CANCELED);
1262                 return 1;
1263         }
1264
1265         ok = !drbd_send_dblock(mdev, req);
1266         req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1267
1268         return ok;
1269 }
1270
1271 /**
1272  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1273  * @mdev:       DRBD device.
1274  * @w:          work object.
1275  * @cancel:     The connection will be closed anyways
1276  */
1277 long w_send_read_req(struct drbd_work *w, int cancel)
1278 {
1279         struct drbd_request *req = container_of(w, struct drbd_request, w);
1280         struct drbd_conf *mdev = w->mdev;
1281         int ok;
1282
1283         if (unlikely(cancel)) {
1284                 req_mod(req, SEND_CANCELED);
1285                 return 1;
1286         }
1287
1288         ok = !drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1289                                  (unsigned long)req);
1290
1291         req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1292
1293         return ok;
1294 }
1295
1296 long w_restart_disk_io(struct drbd_work *w, int cancel)
1297 {
1298         struct drbd_request *req = container_of(w, struct drbd_request, w);
1299         struct drbd_conf *mdev = w->mdev;
1300
1301         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1302                 drbd_al_begin_io(mdev, req->i.sector);
1303         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1304            theoretically. Practically it can not deadlock, since this is
1305            only used when unfreezing IOs. All the extents of the requests
1306            that made it into the TL are already active */
1307
1308         drbd_req_make_private_bio(req, req->master_bio);
1309         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1310         generic_make_request(req->private_bio);
1311
1312         return 1;
1313 }
1314
1315 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1316 {
1317         struct drbd_conf *odev = mdev;
1318
1319         while (1) {
1320                 if (odev->ldev->dc.resync_after == -1)
1321                         return 1;
1322                 odev = minor_to_mdev(odev->ldev->dc.resync_after);
1323                 if (!expect(odev))
1324                         return 1;
1325                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1326                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1327                     odev->state.aftr_isp || odev->state.peer_isp ||
1328                     odev->state.user_isp)
1329                         return 0;
1330         }
1331 }
1332
1333 /**
1334  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1335  * @mdev:       DRBD device.
1336  *
1337  * Called from process context only (admin command and after_state_ch).
1338  */
1339 static int _drbd_pause_after(struct drbd_conf *mdev)
1340 {
1341         struct drbd_conf *odev;
1342         int i, rv = 0;
1343
1344         idr_for_each_entry(&minors, odev, i) {
1345                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1346                         continue;
1347                 if (!_drbd_may_sync_now(odev))
1348                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1349                                != SS_NOTHING_TO_DO);
1350         }
1351
1352         return rv;
1353 }
1354
1355 /**
1356  * _drbd_resume_next() - Resume resync on all devices that may resync now
1357  * @mdev:       DRBD device.
1358  *
1359  * Called from process context only (admin command and worker).
1360  */
1361 static int _drbd_resume_next(struct drbd_conf *mdev)
1362 {
1363         struct drbd_conf *odev;
1364         int i, rv = 0;
1365
1366         idr_for_each_entry(&minors, odev, i) {
1367                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1368                         continue;
1369                 if (odev->state.aftr_isp) {
1370                         if (_drbd_may_sync_now(odev))
1371                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1372                                                         CS_HARD, NULL)
1373                                        != SS_NOTHING_TO_DO) ;
1374                 }
1375         }
1376         return rv;
1377 }
1378
1379 void resume_next_sg(struct drbd_conf *mdev)
1380 {
1381         write_lock_irq(&global_state_lock);
1382         _drbd_resume_next(mdev);
1383         write_unlock_irq(&global_state_lock);
1384 }
1385
1386 void suspend_other_sg(struct drbd_conf *mdev)
1387 {
1388         write_lock_irq(&global_state_lock);
1389         _drbd_pause_after(mdev);
1390         write_unlock_irq(&global_state_lock);
1391 }
1392
1393 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1394 {
1395         struct drbd_conf *odev;
1396
1397         if (o_minor == -1)
1398                 return NO_ERROR;
1399         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1400                 return ERR_SYNC_AFTER;
1401
1402         /* check for loops */
1403         odev = minor_to_mdev(o_minor);
1404         while (1) {
1405                 if (odev == mdev)
1406                         return ERR_SYNC_AFTER_CYCLE;
1407
1408                 /* dependency chain ends here, no cycles. */
1409                 if (odev->ldev->dc.resync_after == -1)
1410                         return NO_ERROR;
1411
1412                 /* follow the dependency chain */
1413                 odev = minor_to_mdev(odev->ldev->dc.resync_after);
1414         }
1415 }
1416
1417 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1418 {
1419         int changes;
1420         int retcode;
1421
1422         write_lock_irq(&global_state_lock);
1423         retcode = sync_after_error(mdev, na);
1424         if (retcode == NO_ERROR) {
1425                 mdev->ldev->dc.resync_after = na;
1426                 do {
1427                         changes  = _drbd_pause_after(mdev);
1428                         changes |= _drbd_resume_next(mdev);
1429                 } while (changes);
1430         }
1431         write_unlock_irq(&global_state_lock);
1432         return retcode;
1433 }
1434
1435 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1436 {
1437         atomic_set(&mdev->rs_sect_in, 0);
1438         atomic_set(&mdev->rs_sect_ev, 0);
1439         mdev->rs_in_flight = 0;
1440         mdev->rs_planed = 0;
1441         spin_lock(&mdev->peer_seq_lock);
1442         fifo_set(&mdev->rs_plan_s, 0);
1443         spin_unlock(&mdev->peer_seq_lock);
1444 }
1445
1446 void start_resync_timer_fn(unsigned long data)
1447 {
1448         struct drbd_conf *mdev = (struct drbd_conf *) data;
1449
1450         drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1451 }
1452
1453 long w_start_resync(struct drbd_work *w, int cancel)
1454 {
1455         struct drbd_conf *mdev = w->mdev;
1456
1457         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1458                 dev_warn(DEV, "w_start_resync later...\n");
1459                 mdev->start_resync_timer.expires = jiffies + HZ/10;
1460                 add_timer(&mdev->start_resync_timer);
1461                 return 1;
1462         }
1463
1464         drbd_start_resync(mdev, C_SYNC_SOURCE);
1465         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1466         return 1;
1467 }
1468
1469 /**
1470  * drbd_start_resync() - Start the resync process
1471  * @mdev:       DRBD device.
1472  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1473  *
1474  * This function might bring you directly into one of the
1475  * C_PAUSED_SYNC_* states.
1476  */
1477 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1478 {
1479         union drbd_state ns;
1480         int r;
1481
1482         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1483                 dev_err(DEV, "Resync already running!\n");
1484                 return;
1485         }
1486
1487         if (mdev->state.conn < C_AHEAD) {
1488                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1489                 drbd_rs_cancel_all(mdev);
1490                 /* This should be done when we abort the resync. We definitely do not
1491                    want to have this for connections going back and forth between
1492                    Ahead/Behind and SyncSource/SyncTarget */
1493         }
1494
1495         if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1496                 if (side == C_SYNC_TARGET) {
1497                         /* Since application IO was locked out during C_WF_BITMAP_T and
1498                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1499                            we check that we might make the data inconsistent. */
1500                         r = drbd_khelper(mdev, "before-resync-target");
1501                         r = (r >> 8) & 0xff;
1502                         if (r > 0) {
1503                                 dev_info(DEV, "before-resync-target handler returned %d, "
1504                                          "dropping connection.\n", r);
1505                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1506                                 return;
1507                         }
1508                 } else /* C_SYNC_SOURCE */ {
1509                         r = drbd_khelper(mdev, "before-resync-source");
1510                         r = (r >> 8) & 0xff;
1511                         if (r > 0) {
1512                                 if (r == 3) {
1513                                         dev_info(DEV, "before-resync-source handler returned %d, "
1514                                                  "ignoring. Old userland tools?", r);
1515                                 } else {
1516                                         dev_info(DEV, "before-resync-source handler returned %d, "
1517                                                  "dropping connection.\n", r);
1518                                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1519                                         return;
1520                                 }
1521                         }
1522                 }
1523         }
1524
1525         if (current == mdev->tconn->worker.task) {
1526                 /* The worker should not sleep waiting for state_mutex,
1527                    that can take long */
1528                 if (!mutex_trylock(mdev->state_mutex)) {
1529                         set_bit(B_RS_H_DONE, &mdev->flags);
1530                         mdev->start_resync_timer.expires = jiffies + HZ/5;
1531                         add_timer(&mdev->start_resync_timer);
1532                         return;
1533                 }
1534         } else {
1535                 mutex_lock(mdev->state_mutex);
1536         }
1537         clear_bit(B_RS_H_DONE, &mdev->flags);
1538
1539         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1540                 mutex_unlock(mdev->state_mutex);
1541                 return;
1542         }
1543
1544         write_lock_irq(&global_state_lock);
1545         ns = mdev->state;
1546
1547         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1548
1549         ns.conn = side;
1550
1551         if (side == C_SYNC_TARGET)
1552                 ns.disk = D_INCONSISTENT;
1553         else /* side == C_SYNC_SOURCE */
1554                 ns.pdsk = D_INCONSISTENT;
1555
1556         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1557         ns = mdev->state;
1558
1559         if (ns.conn < C_CONNECTED)
1560                 r = SS_UNKNOWN_ERROR;
1561
1562         if (r == SS_SUCCESS) {
1563                 unsigned long tw = drbd_bm_total_weight(mdev);
1564                 unsigned long now = jiffies;
1565                 int i;
1566
1567                 mdev->rs_failed    = 0;
1568                 mdev->rs_paused    = 0;
1569                 mdev->rs_same_csum = 0;
1570                 mdev->rs_last_events = 0;
1571                 mdev->rs_last_sect_ev = 0;
1572                 mdev->rs_total     = tw;
1573                 mdev->rs_start     = now;
1574                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1575                         mdev->rs_mark_left[i] = tw;
1576                         mdev->rs_mark_time[i] = now;
1577                 }
1578                 _drbd_pause_after(mdev);
1579         }
1580         write_unlock_irq(&global_state_lock);
1581
1582         if (r == SS_SUCCESS) {
1583                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1584                      drbd_conn_str(ns.conn),
1585                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1586                      (unsigned long) mdev->rs_total);
1587                 if (side == C_SYNC_TARGET)
1588                         mdev->bm_resync_fo = 0;
1589
1590                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1591                  * with w_send_oos, or the sync target will get confused as to
1592                  * how much bits to resync.  We cannot do that always, because for an
1593                  * empty resync and protocol < 95, we need to do it here, as we call
1594                  * drbd_resync_finished from here in that case.
1595                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1596                  * and from after_state_ch otherwise. */
1597                 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1598                         drbd_gen_and_send_sync_uuid(mdev);
1599
1600                 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1601                         /* This still has a race (about when exactly the peers
1602                          * detect connection loss) that can lead to a full sync
1603                          * on next handshake. In 8.3.9 we fixed this with explicit
1604                          * resync-finished notifications, but the fix
1605                          * introduces a protocol change.  Sleeping for some
1606                          * time longer than the ping interval + timeout on the
1607                          * SyncSource, to give the SyncTarget the chance to
1608                          * detect connection loss, then waiting for a ping
1609                          * response (implicit in drbd_resync_finished) reduces
1610                          * the race considerably, but does not solve it. */
1611                         if (side == C_SYNC_SOURCE)
1612                                 schedule_timeout_interruptible(
1613                                         mdev->tconn->net_conf->ping_int * HZ +
1614                                         mdev->tconn->net_conf->ping_timeo*HZ/9);
1615                         drbd_resync_finished(mdev);
1616                 }
1617
1618                 drbd_rs_controller_reset(mdev);
1619                 /* ns.conn may already be != mdev->state.conn,
1620                  * we may have been paused in between, or become paused until
1621                  * the timer triggers.
1622                  * No matter, that is handled in resync_timer_fn() */
1623                 if (ns.conn == C_SYNC_TARGET)
1624                         mod_timer(&mdev->resync_timer, jiffies);
1625
1626                 drbd_md_sync(mdev);
1627         }
1628         put_ldev(mdev);
1629         mutex_unlock(mdev->state_mutex);
1630 }
1631
1632 int drbd_worker(struct drbd_thread *thi)
1633 {
1634         struct drbd_tconn *tconn = thi->tconn;
1635         struct drbd_work *w = NULL;
1636         struct drbd_conf *mdev;
1637         LIST_HEAD(work_list);
1638         int vnr, intr = 0;
1639
1640         while (get_t_state(thi) == RUNNING) {
1641                 drbd_thread_current_set_cpu(thi);
1642
1643                 if (down_trylock(&tconn->data.work.s)) {
1644                         mutex_lock(&tconn->data.mutex);
1645                         if (tconn->data.socket && !tconn->net_conf->no_cork)
1646                                 drbd_tcp_uncork(tconn->data.socket);
1647                         mutex_unlock(&tconn->data.mutex);
1648
1649                         intr = down_interruptible(&tconn->data.work.s);
1650
1651                         mutex_lock(&tconn->data.mutex);
1652                         if (tconn->data.socket  && !tconn->net_conf->no_cork)
1653                                 drbd_tcp_cork(tconn->data.socket);
1654                         mutex_unlock(&tconn->data.mutex);
1655                 }
1656
1657                 if (intr) {
1658                         flush_signals(current);
1659                         if (get_t_state(thi) == RUNNING) {
1660                                 conn_warn(tconn, "Worker got an unexpected signal\n");
1661                                 continue;
1662                         }
1663                         break;
1664                 }
1665
1666                 if (get_t_state(thi) != RUNNING)
1667                         break;
1668                 /* With this break, we have done a down() but not consumed
1669                    the entry from the list. The cleanup code takes care of
1670                    this...   */
1671
1672                 w = NULL;
1673                 spin_lock_irq(&tconn->data.work.q_lock);
1674                 if (list_empty(&tconn->data.work.q)) {
1675                         /* something terribly wrong in our logic.
1676                          * we were able to down() the semaphore,
1677                          * but the list is empty... doh.
1678                          *
1679                          * what is the best thing to do now?
1680                          * try again from scratch, restarting the receiver,
1681                          * asender, whatnot? could break even more ugly,
1682                          * e.g. when we are primary, but no good local data.
1683                          *
1684                          * I'll try to get away just starting over this loop.
1685                          */
1686                         conn_warn(tconn, "Work list unexpectedly empty\n");
1687                         spin_unlock_irq(&tconn->data.work.q_lock);
1688                         continue;
1689                 }
1690                 w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1691                 list_del_init(&w->list);
1692                 spin_unlock_irq(&tconn->data.work.q_lock);
1693
1694                 if (!w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1695                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1696                         if (tconn->cstate >= C_WF_REPORT_PARAMS)
1697                                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1698                 }
1699         }
1700
1701         spin_lock_irq(&tconn->data.work.q_lock);
1702         while (!list_empty(&tconn->data.work.q)) {
1703                 list_splice_init(&tconn->data.work.q, &work_list);
1704                 spin_unlock_irq(&tconn->data.work.q_lock);
1705
1706                 while (!list_empty(&work_list)) {
1707                         w = list_entry(work_list.next, struct drbd_work, list);
1708                         list_del_init(&w->list);
1709                         w->cb(w, 1);
1710                 }
1711
1712                 spin_lock_irq(&tconn->data.work.q_lock);
1713         }
1714         sema_init(&tconn->data.work.s, 0);
1715         /* DANGEROUS race: if someone did queue his work within the spinlock,
1716          * but up() ed outside the spinlock, we could get an up() on the
1717          * semaphore without corresponding list entry.
1718          * So don't do that.
1719          */
1720         spin_unlock_irq(&tconn->data.work.q_lock);
1721
1722         drbd_thread_stop(&tconn->receiver);
1723         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1724                 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1725                 /* _drbd_set_state only uses stop_nowait.
1726                  * wait here for the exiting receiver. */
1727                 drbd_mdev_cleanup(mdev);
1728         }
1729         clear_bit(OBJECT_DYING, &tconn->flags);
1730         clear_bit(CONFIG_PENDING, &tconn->flags);
1731         wake_up(&tconn->ping_wait);
1732
1733         return 0;
1734 }