]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_worker.c
drbd: Introduced drbd_read_state()
[karo-tx-linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42
43
44 /* endio handlers:
45  *   drbd_md_io_complete (defined here)
46  *   drbd_request_endio (defined here)
47  *   drbd_peer_request_endio (defined here)
48  *   bm_async_io_complete (defined in drbd_bitmap.c)
49  *
50  * For all these callbacks, note the following:
51  * The callbacks will be called in irq context by the IDE drivers,
52  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53  * Try to get the locking right :)
54  *
55  */
56
57
58 /* About the global_state_lock
59    Each state transition on an device holds a read lock. In case we have
60    to evaluate the sync after dependencies, we grab a write lock, because
61    we need stable states on all devices for that.  */
62 rwlock_t global_state_lock;
63
64 /* used for synchronous meta data and bitmap IO
65  * submitted by drbd_md_sync_page_io()
66  */
67 void drbd_md_io_complete(struct bio *bio, int error)
68 {
69         struct drbd_md_io *md_io;
70
71         md_io = (struct drbd_md_io *)bio->bi_private;
72         md_io->error = error;
73
74         complete(&md_io->event);
75 }
76
77 /* reads on behalf of the partner,
78  * "submitted" by the receiver
79  */
80 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
81 {
82         unsigned long flags = 0;
83         struct drbd_conf *mdev = peer_req->w.mdev;
84
85         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
86         mdev->read_cnt += peer_req->i.size >> 9;
87         list_del(&peer_req->w.list);
88         if (list_empty(&mdev->read_ee))
89                 wake_up(&mdev->ee_wait);
90         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
91                 __drbd_chk_io_error(mdev, false);
92         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
93
94         drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
95         put_ldev(mdev);
96 }
97
98 /* writes on behalf of the partner, or resync writes,
99  * "submitted" by the receiver, final stage.  */
100 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
101 {
102         unsigned long flags = 0;
103         struct drbd_conf *mdev = peer_req->w.mdev;
104         struct drbd_interval i;
105         int do_wake;
106         u64 block_id;
107         int do_al_complete_io;
108
109         /* after we moved peer_req to done_ee,
110          * we may no longer access it,
111          * it may be freed/reused already!
112          * (as soon as we release the req_lock) */
113         i = peer_req->i;
114         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
115         block_id = peer_req->block_id;
116
117         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
118         mdev->writ_cnt += peer_req->i.size >> 9;
119         list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
120         list_add_tail(&peer_req->w.list, &mdev->done_ee);
121
122         /*
123          * Do not remove from the write_requests tree here: we did not send the
124          * Ack yet and did not wake possibly waiting conflicting requests.
125          * Removed from the tree from "drbd_process_done_ee" within the
126          * appropriate w.cb (e_end_block/e_end_resync_block) or from
127          * _drbd_clear_done_ee.
128          */
129
130         do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
131
132         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
133                 __drbd_chk_io_error(mdev, false);
134         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
135
136         if (block_id == ID_SYNCER)
137                 drbd_rs_complete_io(mdev, i.sector);
138
139         if (do_wake)
140                 wake_up(&mdev->ee_wait);
141
142         if (do_al_complete_io)
143                 drbd_al_complete_io(mdev, &i);
144
145         wake_asender(mdev->tconn);
146         put_ldev(mdev);
147 }
148
149 /* writes on behalf of the partner, or resync writes,
150  * "submitted" by the receiver.
151  */
152 void drbd_peer_request_endio(struct bio *bio, int error)
153 {
154         struct drbd_peer_request *peer_req = bio->bi_private;
155         struct drbd_conf *mdev = peer_req->w.mdev;
156         int uptodate = bio_flagged(bio, BIO_UPTODATE);
157         int is_write = bio_data_dir(bio) == WRITE;
158
159         if (error && __ratelimit(&drbd_ratelimit_state))
160                 dev_warn(DEV, "%s: error=%d s=%llus\n",
161                                 is_write ? "write" : "read", error,
162                                 (unsigned long long)peer_req->i.sector);
163         if (!error && !uptodate) {
164                 if (__ratelimit(&drbd_ratelimit_state))
165                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
166                                         is_write ? "write" : "read",
167                                         (unsigned long long)peer_req->i.sector);
168                 /* strange behavior of some lower level drivers...
169                  * fail the request by clearing the uptodate flag,
170                  * but do not return any error?! */
171                 error = -EIO;
172         }
173
174         if (error)
175                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
176
177         bio_put(bio); /* no need for the bio anymore */
178         if (atomic_dec_and_test(&peer_req->pending_bios)) {
179                 if (is_write)
180                         drbd_endio_write_sec_final(peer_req);
181                 else
182                         drbd_endio_read_sec_final(peer_req);
183         }
184 }
185
186 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
187  */
188 void drbd_request_endio(struct bio *bio, int error)
189 {
190         unsigned long flags;
191         struct drbd_request *req = bio->bi_private;
192         struct drbd_conf *mdev = req->w.mdev;
193         struct bio_and_error m;
194         enum drbd_req_event what;
195         int uptodate = bio_flagged(bio, BIO_UPTODATE);
196
197         if (!error && !uptodate) {
198                 dev_warn(DEV, "p %s: setting error to -EIO\n",
199                          bio_data_dir(bio) == WRITE ? "write" : "read");
200                 /* strange behavior of some lower level drivers...
201                  * fail the request by clearing the uptodate flag,
202                  * but do not return any error?! */
203                 error = -EIO;
204         }
205
206         /* to avoid recursion in __req_mod */
207         if (unlikely(error)) {
208                 what = (bio_data_dir(bio) == WRITE)
209                         ? WRITE_COMPLETED_WITH_ERROR
210                         : (bio_rw(bio) == READ)
211                           ? READ_COMPLETED_WITH_ERROR
212                           : READ_AHEAD_COMPLETED_WITH_ERROR;
213         } else
214                 what = COMPLETED_OK;
215
216         bio_put(req->private_bio);
217         req->private_bio = ERR_PTR(error);
218
219         /* not req_mod(), we need irqsave here! */
220         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
221         __req_mod(req, what, &m);
222         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
223
224         if (m.bio)
225                 complete_master_bio(mdev, &m);
226 }
227
228 int w_read_retry_remote(struct drbd_work *w, int cancel)
229 {
230         struct drbd_request *req = container_of(w, struct drbd_request, w);
231         struct drbd_conf *mdev = w->mdev;
232
233         /* We should not detach for read io-error,
234          * but try to WRITE the P_DATA_REPLY to the failed location,
235          * to give the disk the chance to relocate that block */
236
237         spin_lock_irq(&mdev->tconn->req_lock);
238         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
239                 _req_mod(req, READ_RETRY_REMOTE_CANCELED);
240                 spin_unlock_irq(&mdev->tconn->req_lock);
241                 return 0;
242         }
243         spin_unlock_irq(&mdev->tconn->req_lock);
244
245         return w_send_read_req(w, 0);
246 }
247
248 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
249                   struct drbd_peer_request *peer_req, void *digest)
250 {
251         struct hash_desc desc;
252         struct scatterlist sg;
253         struct page *page = peer_req->pages;
254         struct page *tmp;
255         unsigned len;
256
257         desc.tfm = tfm;
258         desc.flags = 0;
259
260         sg_init_table(&sg, 1);
261         crypto_hash_init(&desc);
262
263         while ((tmp = page_chain_next(page))) {
264                 /* all but the last page will be fully used */
265                 sg_set_page(&sg, page, PAGE_SIZE, 0);
266                 crypto_hash_update(&desc, &sg, sg.length);
267                 page = tmp;
268         }
269         /* and now the last, possibly only partially used page */
270         len = peer_req->i.size & (PAGE_SIZE - 1);
271         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
272         crypto_hash_update(&desc, &sg, sg.length);
273         crypto_hash_final(&desc, digest);
274 }
275
276 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
277 {
278         struct hash_desc desc;
279         struct scatterlist sg;
280         struct bio_vec *bvec;
281         int i;
282
283         desc.tfm = tfm;
284         desc.flags = 0;
285
286         sg_init_table(&sg, 1);
287         crypto_hash_init(&desc);
288
289         __bio_for_each_segment(bvec, bio, i, 0) {
290                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
291                 crypto_hash_update(&desc, &sg, sg.length);
292         }
293         crypto_hash_final(&desc, digest);
294 }
295
296 /* MAYBE merge common code with w_e_end_ov_req */
297 static int w_e_send_csum(struct drbd_work *w, int cancel)
298 {
299         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
300         struct drbd_conf *mdev = w->mdev;
301         int digest_size;
302         void *digest;
303         int err = 0;
304
305         if (unlikely(cancel))
306                 goto out;
307
308         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
309                 goto out;
310
311         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
312         digest = kmalloc(digest_size, GFP_NOIO);
313         if (digest) {
314                 sector_t sector = peer_req->i.sector;
315                 unsigned int size = peer_req->i.size;
316                 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
317                 /* Free peer_req and pages before send.
318                  * In case we block on congestion, we could otherwise run into
319                  * some distributed deadlock, if the other side blocks on
320                  * congestion as well, because our receiver blocks in
321                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
322                 drbd_free_ee(mdev, peer_req);
323                 peer_req = NULL;
324                 inc_rs_pending(mdev);
325                 err = drbd_send_drequest_csum(mdev, sector, size,
326                                               digest, digest_size,
327                                               P_CSUM_RS_REQUEST);
328                 kfree(digest);
329         } else {
330                 dev_err(DEV, "kmalloc() of digest failed.\n");
331                 err = -ENOMEM;
332         }
333
334 out:
335         if (peer_req)
336                 drbd_free_ee(mdev, peer_req);
337
338         if (unlikely(err))
339                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
340         return err;
341 }
342
343 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
344
345 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
346 {
347         struct drbd_peer_request *peer_req;
348
349         if (!get_ldev(mdev))
350                 return -EIO;
351
352         if (drbd_rs_should_slow_down(mdev, sector))
353                 goto defer;
354
355         /* GFP_TRY, because if there is no memory available right now, this may
356          * be rescheduled for later. It is "only" background resync, after all. */
357         peer_req = drbd_alloc_ee(mdev, ID_SYNCER /* unused */, sector, size, GFP_TRY);
358         if (!peer_req)
359                 goto defer;
360
361         peer_req->w.cb = w_e_send_csum;
362         spin_lock_irq(&mdev->tconn->req_lock);
363         list_add(&peer_req->w.list, &mdev->read_ee);
364         spin_unlock_irq(&mdev->tconn->req_lock);
365
366         atomic_add(size >> 9, &mdev->rs_sect_ev);
367         if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
368                 return 0;
369
370         /* If it failed because of ENOMEM, retry should help.  If it failed
371          * because bio_add_page failed (probably broken lower level driver),
372          * retry may or may not help.
373          * If it does not, you may need to force disconnect. */
374         spin_lock_irq(&mdev->tconn->req_lock);
375         list_del(&peer_req->w.list);
376         spin_unlock_irq(&mdev->tconn->req_lock);
377
378         drbd_free_ee(mdev, peer_req);
379 defer:
380         put_ldev(mdev);
381         return -EAGAIN;
382 }
383
384 int w_resync_timer(struct drbd_work *w, int cancel)
385 {
386         struct drbd_conf *mdev = w->mdev;
387         switch (mdev->state.conn) {
388         case C_VERIFY_S:
389                 w_make_ov_request(w, cancel);
390                 break;
391         case C_SYNC_TARGET:
392                 w_make_resync_request(w, cancel);
393                 break;
394         }
395
396         return 0;
397 }
398
399 void resync_timer_fn(unsigned long data)
400 {
401         struct drbd_conf *mdev = (struct drbd_conf *) data;
402
403         if (list_empty(&mdev->resync_work.list))
404                 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
405 }
406
407 static void fifo_set(struct fifo_buffer *fb, int value)
408 {
409         int i;
410
411         for (i = 0; i < fb->size; i++)
412                 fb->values[i] = value;
413 }
414
415 static int fifo_push(struct fifo_buffer *fb, int value)
416 {
417         int ov;
418
419         ov = fb->values[fb->head_index];
420         fb->values[fb->head_index++] = value;
421
422         if (fb->head_index >= fb->size)
423                 fb->head_index = 0;
424
425         return ov;
426 }
427
428 static void fifo_add_val(struct fifo_buffer *fb, int value)
429 {
430         int i;
431
432         for (i = 0; i < fb->size; i++)
433                 fb->values[i] += value;
434 }
435
436 static int drbd_rs_controller(struct drbd_conf *mdev)
437 {
438         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
439         unsigned int want;     /* The number of sectors we want in the proxy */
440         int req_sect; /* Number of sectors to request in this turn */
441         int correction; /* Number of sectors more we need in the proxy*/
442         int cps; /* correction per invocation of drbd_rs_controller() */
443         int steps; /* Number of time steps to plan ahead */
444         int curr_corr;
445         int max_sect;
446
447         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
448         mdev->rs_in_flight -= sect_in;
449
450         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
451
452         steps = mdev->rs_plan_s.size; /* (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
453
454         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
455                 want = ((mdev->ldev->dc.resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
456         } else { /* normal path */
457                 want = mdev->ldev->dc.c_fill_target ? mdev->ldev->dc.c_fill_target :
458                         sect_in * mdev->ldev->dc.c_delay_target * HZ / (SLEEP_TIME * 10);
459         }
460
461         correction = want - mdev->rs_in_flight - mdev->rs_planed;
462
463         /* Plan ahead */
464         cps = correction / steps;
465         fifo_add_val(&mdev->rs_plan_s, cps);
466         mdev->rs_planed += cps * steps;
467
468         /* What we do in this step */
469         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
470         spin_unlock(&mdev->peer_seq_lock);
471         mdev->rs_planed -= curr_corr;
472
473         req_sect = sect_in + curr_corr;
474         if (req_sect < 0)
475                 req_sect = 0;
476
477         max_sect = (mdev->ldev->dc.c_max_rate * 2 * SLEEP_TIME) / HZ;
478         if (req_sect > max_sect)
479                 req_sect = max_sect;
480
481         /*
482         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
483                  sect_in, mdev->rs_in_flight, want, correction,
484                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
485         */
486
487         return req_sect;
488 }
489
490 static int drbd_rs_number_requests(struct drbd_conf *mdev)
491 {
492         int number;
493         if (mdev->rs_plan_s.size) { /* mdev->ldev->dc.c_plan_ahead */
494                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
495                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
496         } else {
497                 mdev->c_sync_rate = mdev->ldev->dc.resync_rate;
498                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
499         }
500
501         /* ignore the amount of pending requests, the resync controller should
502          * throttle down to incoming reply rate soon enough anyways. */
503         return number;
504 }
505
506 int w_make_resync_request(struct drbd_work *w, int cancel)
507 {
508         struct drbd_conf *mdev = w->mdev;
509         unsigned long bit;
510         sector_t sector;
511         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
512         int max_bio_size;
513         int number, rollback_i, size;
514         int align, queued, sndbuf;
515         int i = 0;
516
517         if (unlikely(cancel))
518                 return 0;
519
520         if (mdev->rs_total == 0) {
521                 /* empty resync? */
522                 drbd_resync_finished(mdev);
523                 return 0;
524         }
525
526         if (!get_ldev(mdev)) {
527                 /* Since we only need to access mdev->rsync a
528                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
529                    to continue resync with a broken disk makes no sense at
530                    all */
531                 dev_err(DEV, "Disk broke down during resync!\n");
532                 return 0;
533         }
534
535         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
536         number = drbd_rs_number_requests(mdev);
537         if (number == 0)
538                 goto requeue;
539
540         for (i = 0; i < number; i++) {
541                 /* Stop generating RS requests, when half of the send buffer is filled */
542                 mutex_lock(&mdev->tconn->data.mutex);
543                 if (mdev->tconn->data.socket) {
544                         queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
545                         sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
546                 } else {
547                         queued = 1;
548                         sndbuf = 0;
549                 }
550                 mutex_unlock(&mdev->tconn->data.mutex);
551                 if (queued > sndbuf / 2)
552                         goto requeue;
553
554 next_sector:
555                 size = BM_BLOCK_SIZE;
556                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
557
558                 if (bit == DRBD_END_OF_BITMAP) {
559                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
560                         put_ldev(mdev);
561                         return 0;
562                 }
563
564                 sector = BM_BIT_TO_SECT(bit);
565
566                 if (drbd_rs_should_slow_down(mdev, sector) ||
567                     drbd_try_rs_begin_io(mdev, sector)) {
568                         mdev->bm_resync_fo = bit;
569                         goto requeue;
570                 }
571                 mdev->bm_resync_fo = bit + 1;
572
573                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
574                         drbd_rs_complete_io(mdev, sector);
575                         goto next_sector;
576                 }
577
578 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
579                 /* try to find some adjacent bits.
580                  * we stop if we have already the maximum req size.
581                  *
582                  * Additionally always align bigger requests, in order to
583                  * be prepared for all stripe sizes of software RAIDs.
584                  */
585                 align = 1;
586                 rollback_i = i;
587                 for (;;) {
588                         if (size + BM_BLOCK_SIZE > max_bio_size)
589                                 break;
590
591                         /* Be always aligned */
592                         if (sector & ((1<<(align+3))-1))
593                                 break;
594
595                         /* do not cross extent boundaries */
596                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
597                                 break;
598                         /* now, is it actually dirty, after all?
599                          * caution, drbd_bm_test_bit is tri-state for some
600                          * obscure reason; ( b == 0 ) would get the out-of-band
601                          * only accidentally right because of the "oddly sized"
602                          * adjustment below */
603                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
604                                 break;
605                         bit++;
606                         size += BM_BLOCK_SIZE;
607                         if ((BM_BLOCK_SIZE << align) <= size)
608                                 align++;
609                         i++;
610                 }
611                 /* if we merged some,
612                  * reset the offset to start the next drbd_bm_find_next from */
613                 if (size > BM_BLOCK_SIZE)
614                         mdev->bm_resync_fo = bit + 1;
615 #endif
616
617                 /* adjust very last sectors, in case we are oddly sized */
618                 if (sector + (size>>9) > capacity)
619                         size = (capacity-sector)<<9;
620                 if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
621                         switch (read_for_csum(mdev, sector, size)) {
622                         case -EIO: /* Disk failure */
623                                 put_ldev(mdev);
624                                 return -EIO;
625                         case -EAGAIN: /* allocation failed, or ldev busy */
626                                 drbd_rs_complete_io(mdev, sector);
627                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
628                                 i = rollback_i;
629                                 goto requeue;
630                         case 0:
631                                 /* everything ok */
632                                 break;
633                         default:
634                                 BUG();
635                         }
636                 } else {
637                         int err;
638
639                         inc_rs_pending(mdev);
640                         err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
641                                                  sector, size, ID_SYNCER);
642                         if (err) {
643                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
644                                 dec_rs_pending(mdev);
645                                 put_ldev(mdev);
646                                 return err;
647                         }
648                 }
649         }
650
651         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
652                 /* last syncer _request_ was sent,
653                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
654                  * next sync group will resume), as soon as we receive the last
655                  * resync data block, and the last bit is cleared.
656                  * until then resync "work" is "inactive" ...
657                  */
658                 put_ldev(mdev);
659                 return 0;
660         }
661
662  requeue:
663         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
664         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
665         put_ldev(mdev);
666         return 0;
667 }
668
669 static int w_make_ov_request(struct drbd_work *w, int cancel)
670 {
671         struct drbd_conf *mdev = w->mdev;
672         int number, i, size;
673         sector_t sector;
674         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
675
676         if (unlikely(cancel))
677                 return 1;
678
679         number = drbd_rs_number_requests(mdev);
680
681         sector = mdev->ov_position;
682         for (i = 0; i < number; i++) {
683                 if (sector >= capacity) {
684                         return 1;
685                 }
686
687                 size = BM_BLOCK_SIZE;
688
689                 if (drbd_rs_should_slow_down(mdev, sector) ||
690                     drbd_try_rs_begin_io(mdev, sector)) {
691                         mdev->ov_position = sector;
692                         goto requeue;
693                 }
694
695                 if (sector + (size>>9) > capacity)
696                         size = (capacity-sector)<<9;
697
698                 inc_rs_pending(mdev);
699                 if (drbd_send_ov_request(mdev, sector, size)) {
700                         dec_rs_pending(mdev);
701                         return 0;
702                 }
703                 sector += BM_SECT_PER_BIT;
704         }
705         mdev->ov_position = sector;
706
707  requeue:
708         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
709         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
710         return 1;
711 }
712
713 int w_ov_finished(struct drbd_work *w, int cancel)
714 {
715         struct drbd_conf *mdev = w->mdev;
716         kfree(w);
717         ov_out_of_sync_print(mdev);
718         drbd_resync_finished(mdev);
719
720         return 0;
721 }
722
723 static int w_resync_finished(struct drbd_work *w, int cancel)
724 {
725         struct drbd_conf *mdev = w->mdev;
726         kfree(w);
727
728         drbd_resync_finished(mdev);
729
730         return 0;
731 }
732
733 static void ping_peer(struct drbd_conf *mdev)
734 {
735         struct drbd_tconn *tconn = mdev->tconn;
736
737         clear_bit(GOT_PING_ACK, &tconn->flags);
738         request_ping(tconn);
739         wait_event(tconn->ping_wait,
740                    test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
741 }
742
743 int drbd_resync_finished(struct drbd_conf *mdev)
744 {
745         unsigned long db, dt, dbdt;
746         unsigned long n_oos;
747         union drbd_state os, ns;
748         struct drbd_work *w;
749         char *khelper_cmd = NULL;
750         int verify_done = 0;
751
752         /* Remove all elements from the resync LRU. Since future actions
753          * might set bits in the (main) bitmap, then the entries in the
754          * resync LRU would be wrong. */
755         if (drbd_rs_del_all(mdev)) {
756                 /* In case this is not possible now, most probably because
757                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
758                  * queue (or even the read operations for those packets
759                  * is not finished by now).   Retry in 100ms. */
760
761                 schedule_timeout_interruptible(HZ / 10);
762                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
763                 if (w) {
764                         w->cb = w_resync_finished;
765                         drbd_queue_work(&mdev->tconn->data.work, w);
766                         return 1;
767                 }
768                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
769         }
770
771         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
772         if (dt <= 0)
773                 dt = 1;
774         db = mdev->rs_total;
775         dbdt = Bit2KB(db/dt);
776         mdev->rs_paused /= HZ;
777
778         if (!get_ldev(mdev))
779                 goto out;
780
781         ping_peer(mdev);
782
783         spin_lock_irq(&mdev->tconn->req_lock);
784         os = drbd_read_state(mdev);
785
786         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
787
788         /* This protects us against multiple calls (that can happen in the presence
789            of application IO), and against connectivity loss just before we arrive here. */
790         if (os.conn <= C_CONNECTED)
791                 goto out_unlock;
792
793         ns = os;
794         ns.conn = C_CONNECTED;
795
796         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
797              verify_done ? "Online verify " : "Resync",
798              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
799
800         n_oos = drbd_bm_total_weight(mdev);
801
802         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
803                 if (n_oos) {
804                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
805                               n_oos, Bit2KB(1));
806                         khelper_cmd = "out-of-sync";
807                 }
808         } else {
809                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
810
811                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
812                         khelper_cmd = "after-resync-target";
813
814                 if (mdev->tconn->csums_tfm && mdev->rs_total) {
815                         const unsigned long s = mdev->rs_same_csum;
816                         const unsigned long t = mdev->rs_total;
817                         const int ratio =
818                                 (t == 0)     ? 0 :
819                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
820                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
821                              "transferred %luK total %luK\n",
822                              ratio,
823                              Bit2KB(mdev->rs_same_csum),
824                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
825                              Bit2KB(mdev->rs_total));
826                 }
827         }
828
829         if (mdev->rs_failed) {
830                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
831
832                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
833                         ns.disk = D_INCONSISTENT;
834                         ns.pdsk = D_UP_TO_DATE;
835                 } else {
836                         ns.disk = D_UP_TO_DATE;
837                         ns.pdsk = D_INCONSISTENT;
838                 }
839         } else {
840                 ns.disk = D_UP_TO_DATE;
841                 ns.pdsk = D_UP_TO_DATE;
842
843                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
844                         if (mdev->p_uuid) {
845                                 int i;
846                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
847                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
848                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
849                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
850                         } else {
851                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
852                         }
853                 }
854
855                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
856                         /* for verify runs, we don't update uuids here,
857                          * so there would be nothing to report. */
858                         drbd_uuid_set_bm(mdev, 0UL);
859                         drbd_print_uuids(mdev, "updated UUIDs");
860                         if (mdev->p_uuid) {
861                                 /* Now the two UUID sets are equal, update what we
862                                  * know of the peer. */
863                                 int i;
864                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
865                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
866                         }
867                 }
868         }
869
870         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
871 out_unlock:
872         spin_unlock_irq(&mdev->tconn->req_lock);
873         put_ldev(mdev);
874 out:
875         mdev->rs_total  = 0;
876         mdev->rs_failed = 0;
877         mdev->rs_paused = 0;
878         if (verify_done)
879                 mdev->ov_start_sector = 0;
880
881         drbd_md_sync(mdev);
882
883         if (khelper_cmd)
884                 drbd_khelper(mdev, khelper_cmd);
885
886         return 1;
887 }
888
889 /* helper */
890 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
891 {
892         if (drbd_ee_has_active_page(peer_req)) {
893                 /* This might happen if sendpage() has not finished */
894                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
895                 atomic_add(i, &mdev->pp_in_use_by_net);
896                 atomic_sub(i, &mdev->pp_in_use);
897                 spin_lock_irq(&mdev->tconn->req_lock);
898                 list_add_tail(&peer_req->w.list, &mdev->net_ee);
899                 spin_unlock_irq(&mdev->tconn->req_lock);
900                 wake_up(&drbd_pp_wait);
901         } else
902                 drbd_free_ee(mdev, peer_req);
903 }
904
905 /**
906  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
907  * @mdev:       DRBD device.
908  * @w:          work object.
909  * @cancel:     The connection will be closed anyways
910  */
911 int w_e_end_data_req(struct drbd_work *w, int cancel)
912 {
913         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
914         struct drbd_conf *mdev = w->mdev;
915         int err;
916
917         if (unlikely(cancel)) {
918                 drbd_free_ee(mdev, peer_req);
919                 dec_unacked(mdev);
920                 return 0;
921         }
922
923         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
924                 err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
925         } else {
926                 if (__ratelimit(&drbd_ratelimit_state))
927                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
928                             (unsigned long long)peer_req->i.sector);
929
930                 err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
931         }
932
933         dec_unacked(mdev);
934
935         move_to_net_ee_or_free(mdev, peer_req);
936
937         if (unlikely(err))
938                 dev_err(DEV, "drbd_send_block() failed\n");
939         return err;
940 }
941
942 /**
943  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
944  * @mdev:       DRBD device.
945  * @w:          work object.
946  * @cancel:     The connection will be closed anyways
947  */
948 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
949 {
950         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
951         struct drbd_conf *mdev = w->mdev;
952         int err;
953
954         if (unlikely(cancel)) {
955                 drbd_free_ee(mdev, peer_req);
956                 dec_unacked(mdev);
957                 return 0;
958         }
959
960         if (get_ldev_if_state(mdev, D_FAILED)) {
961                 drbd_rs_complete_io(mdev, peer_req->i.sector);
962                 put_ldev(mdev);
963         }
964
965         if (mdev->state.conn == C_AHEAD) {
966                 err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
967         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
968                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
969                         inc_rs_pending(mdev);
970                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
971                 } else {
972                         if (__ratelimit(&drbd_ratelimit_state))
973                                 dev_err(DEV, "Not sending RSDataReply, "
974                                     "partner DISKLESS!\n");
975                         err = 0;
976                 }
977         } else {
978                 if (__ratelimit(&drbd_ratelimit_state))
979                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
980                             (unsigned long long)peer_req->i.sector);
981
982                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
983
984                 /* update resync data with failure */
985                 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
986         }
987
988         dec_unacked(mdev);
989
990         move_to_net_ee_or_free(mdev, peer_req);
991
992         if (unlikely(err))
993                 dev_err(DEV, "drbd_send_block() failed\n");
994         return err;
995 }
996
997 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
998 {
999         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1000         struct drbd_conf *mdev = w->mdev;
1001         struct digest_info *di;
1002         int digest_size;
1003         void *digest = NULL;
1004         int err, eq = 0;
1005
1006         if (unlikely(cancel)) {
1007                 drbd_free_ee(mdev, peer_req);
1008                 dec_unacked(mdev);
1009                 return 0;
1010         }
1011
1012         if (get_ldev(mdev)) {
1013                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1014                 put_ldev(mdev);
1015         }
1016
1017         di = peer_req->digest;
1018
1019         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1020                 /* quick hack to try to avoid a race against reconfiguration.
1021                  * a real fix would be much more involved,
1022                  * introducing more locking mechanisms */
1023                 if (mdev->tconn->csums_tfm) {
1024                         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1025                         D_ASSERT(digest_size == di->digest_size);
1026                         digest = kmalloc(digest_size, GFP_NOIO);
1027                 }
1028                 if (digest) {
1029                         drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1030                         eq = !memcmp(digest, di->digest, digest_size);
1031                         kfree(digest);
1032                 }
1033
1034                 if (eq) {
1035                         drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1036                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1037                         mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1038                         err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1039                 } else {
1040                         inc_rs_pending(mdev);
1041                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1042                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1043                         kfree(di);
1044                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1045                 }
1046         } else {
1047                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1048                 if (__ratelimit(&drbd_ratelimit_state))
1049                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1050         }
1051
1052         dec_unacked(mdev);
1053         move_to_net_ee_or_free(mdev, peer_req);
1054
1055         if (unlikely(err))
1056                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1057         return err;
1058 }
1059
1060 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1061 {
1062         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1063         struct drbd_conf *mdev = w->mdev;
1064         sector_t sector = peer_req->i.sector;
1065         unsigned int size = peer_req->i.size;
1066         int digest_size;
1067         void *digest;
1068         int err = 0;
1069
1070         if (unlikely(cancel))
1071                 goto out;
1072
1073         digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1074         digest = kmalloc(digest_size, GFP_NOIO);
1075         if (!digest) {
1076                 err = 1;        /* terminate the connection in case the allocation failed */
1077                 goto out;
1078         }
1079
1080         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1081                 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1082         else
1083                 memset(digest, 0, digest_size);
1084
1085         /* Free e and pages before send.
1086          * In case we block on congestion, we could otherwise run into
1087          * some distributed deadlock, if the other side blocks on
1088          * congestion as well, because our receiver blocks in
1089          * drbd_pp_alloc due to pp_in_use > max_buffers. */
1090         drbd_free_ee(mdev, peer_req);
1091         peer_req = NULL;
1092         inc_rs_pending(mdev);
1093         err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1094         if (err)
1095                 dec_rs_pending(mdev);
1096         kfree(digest);
1097
1098 out:
1099         if (peer_req)
1100                 drbd_free_ee(mdev, peer_req);
1101         dec_unacked(mdev);
1102         return err;
1103 }
1104
1105 void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size)
1106 {
1107         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1108                 mdev->ov_last_oos_size += size>>9;
1109         } else {
1110                 mdev->ov_last_oos_start = sector;
1111                 mdev->ov_last_oos_size = size>>9;
1112         }
1113         drbd_set_out_of_sync(mdev, sector, size);
1114 }
1115
1116 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1117 {
1118         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1119         struct drbd_conf *mdev = w->mdev;
1120         struct digest_info *di;
1121         void *digest;
1122         sector_t sector = peer_req->i.sector;
1123         unsigned int size = peer_req->i.size;
1124         int digest_size;
1125         int err, eq = 0;
1126
1127         if (unlikely(cancel)) {
1128                 drbd_free_ee(mdev, peer_req);
1129                 dec_unacked(mdev);
1130                 return 0;
1131         }
1132
1133         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1134          * the resync lru has been cleaned up already */
1135         if (get_ldev(mdev)) {
1136                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1137                 put_ldev(mdev);
1138         }
1139
1140         di = peer_req->digest;
1141
1142         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1143                 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1144                 digest = kmalloc(digest_size, GFP_NOIO);
1145                 if (digest) {
1146                         drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1147
1148                         D_ASSERT(digest_size == di->digest_size);
1149                         eq = !memcmp(digest, di->digest, digest_size);
1150                         kfree(digest);
1151                 }
1152         }
1153
1154         /* Free peer_req and pages before send.
1155          * In case we block on congestion, we could otherwise run into
1156          * some distributed deadlock, if the other side blocks on
1157          * congestion as well, because our receiver blocks in
1158          * drbd_pp_alloc due to pp_in_use > max_buffers. */
1159         drbd_free_ee(mdev, peer_req);
1160         if (!eq)
1161                 drbd_ov_out_of_sync_found(mdev, sector, size);
1162         else
1163                 ov_out_of_sync_print(mdev);
1164
1165         err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1166                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1167
1168         dec_unacked(mdev);
1169
1170         --mdev->ov_left;
1171
1172         /* let's advance progress step marks only for every other megabyte */
1173         if ((mdev->ov_left & 0x200) == 0x200)
1174                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1175
1176         if (mdev->ov_left == 0) {
1177                 ov_out_of_sync_print(mdev);
1178                 drbd_resync_finished(mdev);
1179         }
1180
1181         return err;
1182 }
1183
1184 int w_prev_work_done(struct drbd_work *w, int cancel)
1185 {
1186         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1187
1188         complete(&b->done);
1189         return 0;
1190 }
1191
1192 int w_send_barrier(struct drbd_work *w, int cancel)
1193 {
1194         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1195         struct drbd_conf *mdev = w->mdev;
1196         struct p_barrier *p = mdev->tconn->data.sbuf;
1197         int err = 0;
1198
1199         /* really avoid racing with tl_clear.  w.cb may have been referenced
1200          * just before it was reassigned and re-queued, so double check that.
1201          * actually, this race was harmless, since we only try to send the
1202          * barrier packet here, and otherwise do nothing with the object.
1203          * but compare with the head of w_clear_epoch */
1204         spin_lock_irq(&mdev->tconn->req_lock);
1205         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1206                 cancel = 1;
1207         spin_unlock_irq(&mdev->tconn->req_lock);
1208         if (cancel)
1209                 return 0;
1210
1211         err = drbd_get_data_sock(mdev->tconn);
1212         if (err)
1213                 return err;
1214         p->barrier = b->br_number;
1215         /* inc_ap_pending was done where this was queued.
1216          * dec_ap_pending will be done in got_BarrierAck
1217          * or (on connection loss) in w_clear_epoch.  */
1218         err = _drbd_send_cmd(mdev, &mdev->tconn->data, P_BARRIER,
1219                              &p->head, sizeof(*p), 0);
1220         drbd_put_data_sock(mdev->tconn);
1221
1222         return err;
1223 }
1224
1225 int w_send_write_hint(struct drbd_work *w, int cancel)
1226 {
1227         struct drbd_conf *mdev = w->mdev;
1228         if (cancel)
1229                 return 0;
1230         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1231 }
1232
1233 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1234 {
1235         struct drbd_request *req = container_of(w, struct drbd_request, w);
1236         struct drbd_conf *mdev = w->mdev;
1237         int err;
1238
1239         if (unlikely(cancel)) {
1240                 req_mod(req, SEND_CANCELED);
1241                 return 0;
1242         }
1243
1244         err = drbd_send_out_of_sync(mdev, req);
1245         req_mod(req, OOS_HANDED_TO_NETWORK);
1246
1247         return err;
1248 }
1249
1250 /**
1251  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1252  * @mdev:       DRBD device.
1253  * @w:          work object.
1254  * @cancel:     The connection will be closed anyways
1255  */
1256 int w_send_dblock(struct drbd_work *w, int cancel)
1257 {
1258         struct drbd_request *req = container_of(w, struct drbd_request, w);
1259         struct drbd_conf *mdev = w->mdev;
1260         int err;
1261
1262         if (unlikely(cancel)) {
1263                 req_mod(req, SEND_CANCELED);
1264                 return 0;
1265         }
1266
1267         err = drbd_send_dblock(mdev, req);
1268         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1269
1270         return err;
1271 }
1272
1273 /**
1274  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1275  * @mdev:       DRBD device.
1276  * @w:          work object.
1277  * @cancel:     The connection will be closed anyways
1278  */
1279 int w_send_read_req(struct drbd_work *w, int cancel)
1280 {
1281         struct drbd_request *req = container_of(w, struct drbd_request, w);
1282         struct drbd_conf *mdev = w->mdev;
1283         int err;
1284
1285         if (unlikely(cancel)) {
1286                 req_mod(req, SEND_CANCELED);
1287                 return 0;
1288         }
1289
1290         err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1291                                  (unsigned long)req);
1292
1293         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1294
1295         return err;
1296 }
1297
1298 int w_restart_disk_io(struct drbd_work *w, int cancel)
1299 {
1300         struct drbd_request *req = container_of(w, struct drbd_request, w);
1301         struct drbd_conf *mdev = w->mdev;
1302
1303         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1304                 drbd_al_begin_io(mdev, &req->i);
1305         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1306            theoretically. Practically it can not deadlock, since this is
1307            only used when unfreezing IOs. All the extents of the requests
1308            that made it into the TL are already active */
1309
1310         drbd_req_make_private_bio(req, req->master_bio);
1311         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1312         generic_make_request(req->private_bio);
1313
1314         return 0;
1315 }
1316
1317 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1318 {
1319         struct drbd_conf *odev = mdev;
1320
1321         while (1) {
1322                 if (!odev->ldev)
1323                         return 1;
1324                 if (odev->ldev->dc.resync_after == -1)
1325                         return 1;
1326                 odev = minor_to_mdev(odev->ldev->dc.resync_after);
1327                 if (!expect(odev))
1328                         return 1;
1329                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1330                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1331                     odev->state.aftr_isp || odev->state.peer_isp ||
1332                     odev->state.user_isp)
1333                         return 0;
1334         }
1335 }
1336
1337 /**
1338  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1339  * @mdev:       DRBD device.
1340  *
1341  * Called from process context only (admin command and after_state_ch).
1342  */
1343 static int _drbd_pause_after(struct drbd_conf *mdev)
1344 {
1345         struct drbd_conf *odev;
1346         int i, rv = 0;
1347
1348         idr_for_each_entry(&minors, odev, i) {
1349                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1350                         continue;
1351                 if (!_drbd_may_sync_now(odev))
1352                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1353                                != SS_NOTHING_TO_DO);
1354         }
1355
1356         return rv;
1357 }
1358
1359 /**
1360  * _drbd_resume_next() - Resume resync on all devices that may resync now
1361  * @mdev:       DRBD device.
1362  *
1363  * Called from process context only (admin command and worker).
1364  */
1365 static int _drbd_resume_next(struct drbd_conf *mdev)
1366 {
1367         struct drbd_conf *odev;
1368         int i, rv = 0;
1369
1370         idr_for_each_entry(&minors, odev, i) {
1371                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1372                         continue;
1373                 if (odev->state.aftr_isp) {
1374                         if (_drbd_may_sync_now(odev))
1375                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1376                                                         CS_HARD, NULL)
1377                                        != SS_NOTHING_TO_DO) ;
1378                 }
1379         }
1380         return rv;
1381 }
1382
1383 void resume_next_sg(struct drbd_conf *mdev)
1384 {
1385         write_lock_irq(&global_state_lock);
1386         _drbd_resume_next(mdev);
1387         write_unlock_irq(&global_state_lock);
1388 }
1389
1390 void suspend_other_sg(struct drbd_conf *mdev)
1391 {
1392         write_lock_irq(&global_state_lock);
1393         _drbd_pause_after(mdev);
1394         write_unlock_irq(&global_state_lock);
1395 }
1396
1397 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1398 {
1399         struct drbd_conf *odev;
1400
1401         if (o_minor == -1)
1402                 return NO_ERROR;
1403         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1404                 return ERR_SYNC_AFTER;
1405
1406         /* check for loops */
1407         odev = minor_to_mdev(o_minor);
1408         while (1) {
1409                 if (odev == mdev)
1410                         return ERR_SYNC_AFTER_CYCLE;
1411
1412                 /* dependency chain ends here, no cycles. */
1413                 if (odev->ldev->dc.resync_after == -1)
1414                         return NO_ERROR;
1415
1416                 /* follow the dependency chain */
1417                 odev = minor_to_mdev(odev->ldev->dc.resync_after);
1418         }
1419 }
1420
1421 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1422 {
1423         int changes;
1424         int retcode;
1425
1426         write_lock_irq(&global_state_lock);
1427         retcode = sync_after_error(mdev, na);
1428         if (retcode == NO_ERROR) {
1429                 mdev->ldev->dc.resync_after = na;
1430                 do {
1431                         changes  = _drbd_pause_after(mdev);
1432                         changes |= _drbd_resume_next(mdev);
1433                 } while (changes);
1434         }
1435         write_unlock_irq(&global_state_lock);
1436         return retcode;
1437 }
1438
1439 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1440 {
1441         atomic_set(&mdev->rs_sect_in, 0);
1442         atomic_set(&mdev->rs_sect_ev, 0);
1443         mdev->rs_in_flight = 0;
1444         mdev->rs_planed = 0;
1445         spin_lock(&mdev->peer_seq_lock);
1446         fifo_set(&mdev->rs_plan_s, 0);
1447         spin_unlock(&mdev->peer_seq_lock);
1448 }
1449
1450 void start_resync_timer_fn(unsigned long data)
1451 {
1452         struct drbd_conf *mdev = (struct drbd_conf *) data;
1453
1454         drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1455 }
1456
1457 int w_start_resync(struct drbd_work *w, int cancel)
1458 {
1459         struct drbd_conf *mdev = w->mdev;
1460
1461         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1462                 dev_warn(DEV, "w_start_resync later...\n");
1463                 mdev->start_resync_timer.expires = jiffies + HZ/10;
1464                 add_timer(&mdev->start_resync_timer);
1465                 return 0;
1466         }
1467
1468         drbd_start_resync(mdev, C_SYNC_SOURCE);
1469         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1470         return 0;
1471 }
1472
1473 /**
1474  * drbd_start_resync() - Start the resync process
1475  * @mdev:       DRBD device.
1476  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1477  *
1478  * This function might bring you directly into one of the
1479  * C_PAUSED_SYNC_* states.
1480  */
1481 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1482 {
1483         union drbd_state ns;
1484         int r;
1485
1486         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1487                 dev_err(DEV, "Resync already running!\n");
1488                 return;
1489         }
1490
1491         if (mdev->state.conn < C_AHEAD) {
1492                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1493                 drbd_rs_cancel_all(mdev);
1494                 /* This should be done when we abort the resync. We definitely do not
1495                    want to have this for connections going back and forth between
1496                    Ahead/Behind and SyncSource/SyncTarget */
1497         }
1498
1499         if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1500                 if (side == C_SYNC_TARGET) {
1501                         /* Since application IO was locked out during C_WF_BITMAP_T and
1502                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1503                            we check that we might make the data inconsistent. */
1504                         r = drbd_khelper(mdev, "before-resync-target");
1505                         r = (r >> 8) & 0xff;
1506                         if (r > 0) {
1507                                 dev_info(DEV, "before-resync-target handler returned %d, "
1508                                          "dropping connection.\n", r);
1509                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1510                                 return;
1511                         }
1512                 } else /* C_SYNC_SOURCE */ {
1513                         r = drbd_khelper(mdev, "before-resync-source");
1514                         r = (r >> 8) & 0xff;
1515                         if (r > 0) {
1516                                 if (r == 3) {
1517                                         dev_info(DEV, "before-resync-source handler returned %d, "
1518                                                  "ignoring. Old userland tools?", r);
1519                                 } else {
1520                                         dev_info(DEV, "before-resync-source handler returned %d, "
1521                                                  "dropping connection.\n", r);
1522                                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1523                                         return;
1524                                 }
1525                         }
1526                 }
1527         }
1528
1529         if (current == mdev->tconn->worker.task) {
1530                 /* The worker should not sleep waiting for state_mutex,
1531                    that can take long */
1532                 if (!mutex_trylock(mdev->state_mutex)) {
1533                         set_bit(B_RS_H_DONE, &mdev->flags);
1534                         mdev->start_resync_timer.expires = jiffies + HZ/5;
1535                         add_timer(&mdev->start_resync_timer);
1536                         return;
1537                 }
1538         } else {
1539                 mutex_lock(mdev->state_mutex);
1540         }
1541         clear_bit(B_RS_H_DONE, &mdev->flags);
1542
1543         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1544                 mutex_unlock(mdev->state_mutex);
1545                 return;
1546         }
1547
1548         write_lock_irq(&global_state_lock);
1549         ns = drbd_read_state(mdev);
1550
1551         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1552
1553         ns.conn = side;
1554
1555         if (side == C_SYNC_TARGET)
1556                 ns.disk = D_INCONSISTENT;
1557         else /* side == C_SYNC_SOURCE */
1558                 ns.pdsk = D_INCONSISTENT;
1559
1560         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1561         ns = drbd_read_state(mdev);
1562
1563         if (ns.conn < C_CONNECTED)
1564                 r = SS_UNKNOWN_ERROR;
1565
1566         if (r == SS_SUCCESS) {
1567                 unsigned long tw = drbd_bm_total_weight(mdev);
1568                 unsigned long now = jiffies;
1569                 int i;
1570
1571                 mdev->rs_failed    = 0;
1572                 mdev->rs_paused    = 0;
1573                 mdev->rs_same_csum = 0;
1574                 mdev->rs_last_events = 0;
1575                 mdev->rs_last_sect_ev = 0;
1576                 mdev->rs_total     = tw;
1577                 mdev->rs_start     = now;
1578                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1579                         mdev->rs_mark_left[i] = tw;
1580                         mdev->rs_mark_time[i] = now;
1581                 }
1582                 _drbd_pause_after(mdev);
1583         }
1584         write_unlock_irq(&global_state_lock);
1585
1586         if (r == SS_SUCCESS) {
1587                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1588                      drbd_conn_str(ns.conn),
1589                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1590                      (unsigned long) mdev->rs_total);
1591                 if (side == C_SYNC_TARGET)
1592                         mdev->bm_resync_fo = 0;
1593
1594                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1595                  * with w_send_oos, or the sync target will get confused as to
1596                  * how much bits to resync.  We cannot do that always, because for an
1597                  * empty resync and protocol < 95, we need to do it here, as we call
1598                  * drbd_resync_finished from here in that case.
1599                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1600                  * and from after_state_ch otherwise. */
1601                 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1602                         drbd_gen_and_send_sync_uuid(mdev);
1603
1604                 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1605                         /* This still has a race (about when exactly the peers
1606                          * detect connection loss) that can lead to a full sync
1607                          * on next handshake. In 8.3.9 we fixed this with explicit
1608                          * resync-finished notifications, but the fix
1609                          * introduces a protocol change.  Sleeping for some
1610                          * time longer than the ping interval + timeout on the
1611                          * SyncSource, to give the SyncTarget the chance to
1612                          * detect connection loss, then waiting for a ping
1613                          * response (implicit in drbd_resync_finished) reduces
1614                          * the race considerably, but does not solve it. */
1615                         if (side == C_SYNC_SOURCE)
1616                                 schedule_timeout_interruptible(
1617                                         mdev->tconn->net_conf->ping_int * HZ +
1618                                         mdev->tconn->net_conf->ping_timeo*HZ/9);
1619                         drbd_resync_finished(mdev);
1620                 }
1621
1622                 drbd_rs_controller_reset(mdev);
1623                 /* ns.conn may already be != mdev->state.conn,
1624                  * we may have been paused in between, or become paused until
1625                  * the timer triggers.
1626                  * No matter, that is handled in resync_timer_fn() */
1627                 if (ns.conn == C_SYNC_TARGET)
1628                         mod_timer(&mdev->resync_timer, jiffies);
1629
1630                 drbd_md_sync(mdev);
1631         }
1632         put_ldev(mdev);
1633         mutex_unlock(mdev->state_mutex);
1634 }
1635
1636 int drbd_worker(struct drbd_thread *thi)
1637 {
1638         struct drbd_tconn *tconn = thi->tconn;
1639         struct drbd_work *w = NULL;
1640         struct drbd_conf *mdev;
1641         LIST_HEAD(work_list);
1642         int vnr, intr = 0;
1643
1644         while (get_t_state(thi) == RUNNING) {
1645                 drbd_thread_current_set_cpu(thi);
1646
1647                 if (down_trylock(&tconn->data.work.s)) {
1648                         mutex_lock(&tconn->data.mutex);
1649                         if (tconn->data.socket && !tconn->net_conf->no_cork)
1650                                 drbd_tcp_uncork(tconn->data.socket);
1651                         mutex_unlock(&tconn->data.mutex);
1652
1653                         intr = down_interruptible(&tconn->data.work.s);
1654
1655                         mutex_lock(&tconn->data.mutex);
1656                         if (tconn->data.socket  && !tconn->net_conf->no_cork)
1657                                 drbd_tcp_cork(tconn->data.socket);
1658                         mutex_unlock(&tconn->data.mutex);
1659                 }
1660
1661                 if (intr) {
1662                         flush_signals(current);
1663                         if (get_t_state(thi) == RUNNING) {
1664                                 conn_warn(tconn, "Worker got an unexpected signal\n");
1665                                 continue;
1666                         }
1667                         break;
1668                 }
1669
1670                 if (get_t_state(thi) != RUNNING)
1671                         break;
1672                 /* With this break, we have done a down() but not consumed
1673                    the entry from the list. The cleanup code takes care of
1674                    this...   */
1675
1676                 w = NULL;
1677                 spin_lock_irq(&tconn->data.work.q_lock);
1678                 if (list_empty(&tconn->data.work.q)) {
1679                         /* something terribly wrong in our logic.
1680                          * we were able to down() the semaphore,
1681                          * but the list is empty... doh.
1682                          *
1683                          * what is the best thing to do now?
1684                          * try again from scratch, restarting the receiver,
1685                          * asender, whatnot? could break even more ugly,
1686                          * e.g. when we are primary, but no good local data.
1687                          *
1688                          * I'll try to get away just starting over this loop.
1689                          */
1690                         conn_warn(tconn, "Work list unexpectedly empty\n");
1691                         spin_unlock_irq(&tconn->data.work.q_lock);
1692                         continue;
1693                 }
1694                 w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1695                 list_del_init(&w->list);
1696                 spin_unlock_irq(&tconn->data.work.q_lock);
1697
1698                 if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1699                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1700                         if (tconn->cstate >= C_WF_REPORT_PARAMS)
1701                                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1702                 }
1703         }
1704
1705         spin_lock_irq(&tconn->data.work.q_lock);
1706         while (!list_empty(&tconn->data.work.q)) {
1707                 list_splice_init(&tconn->data.work.q, &work_list);
1708                 spin_unlock_irq(&tconn->data.work.q_lock);
1709
1710                 while (!list_empty(&work_list)) {
1711                         w = list_entry(work_list.next, struct drbd_work, list);
1712                         list_del_init(&w->list);
1713                         w->cb(w, 1);
1714                 }
1715
1716                 spin_lock_irq(&tconn->data.work.q_lock);
1717         }
1718         sema_init(&tconn->data.work.s, 0);
1719         /* DANGEROUS race: if someone did queue his work within the spinlock,
1720          * but up() ed outside the spinlock, we could get an up() on the
1721          * semaphore without corresponding list entry.
1722          * So don't do that.
1723          */
1724         spin_unlock_irq(&tconn->data.work.q_lock);
1725
1726         drbd_thread_stop(&tconn->receiver);
1727         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1728                 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1729                 /* _drbd_set_state only uses stop_nowait.
1730                  * wait here for the exiting receiver. */
1731                 drbd_mdev_cleanup(mdev);
1732         }
1733         clear_bit(OBJECT_DYING, &tconn->flags);
1734         clear_bit(CONFIG_PENDING, &tconn->flags);
1735         wake_up(&tconn->ping_wait);
1736
1737         return 0;
1738 }