]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_worker.c
drbd: Rename struct drbd_epoch_entry to struct drbd_peer_request
[karo-tx-linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_conf *mdev,
43                                  struct drbd_work *w, int cancel);
44
45
46
47 /* endio handlers:
48  *   drbd_md_io_complete (defined here)
49  *   drbd_endio_pri (defined here)
50  *   drbd_endio_sec (defined here)
51  *   bm_async_io_complete (defined in drbd_bitmap.c)
52  *
53  * For all these callbacks, note the following:
54  * The callbacks will be called in irq context by the IDE drivers,
55  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
56  * Try to get the locking right :)
57  *
58  */
59
60
61 /* About the global_state_lock
62    Each state transition on an device holds a read lock. In case we have
63    to evaluate the sync after dependencies, we grab a write lock, because
64    we need stable states on all devices for that.  */
65 rwlock_t global_state_lock;
66
67 /* used for synchronous meta data and bitmap IO
68  * submitted by drbd_md_sync_page_io()
69  */
70 void drbd_md_io_complete(struct bio *bio, int error)
71 {
72         struct drbd_md_io *md_io;
73
74         md_io = (struct drbd_md_io *)bio->bi_private;
75         md_io->error = error;
76
77         complete(&md_io->event);
78 }
79
80 /* reads on behalf of the partner,
81  * "submitted" by the receiver
82  */
83 void drbd_endio_read_sec_final(struct drbd_peer_request *e) __releases(local)
84 {
85         unsigned long flags = 0;
86         struct drbd_conf *mdev = e->mdev;
87
88         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
89         mdev->read_cnt += e->i.size >> 9;
90         list_del(&e->w.list);
91         if (list_empty(&mdev->read_ee))
92                 wake_up(&mdev->ee_wait);
93         if (test_bit(__EE_WAS_ERROR, &e->flags))
94                 __drbd_chk_io_error(mdev, false);
95         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
96
97         drbd_queue_work(&mdev->tconn->data.work, &e->w);
98         put_ldev(mdev);
99 }
100
101 /* writes on behalf of the partner, or resync writes,
102  * "submitted" by the receiver, final stage.  */
103 static void drbd_endio_write_sec_final(struct drbd_peer_request *e) __releases(local)
104 {
105         unsigned long flags = 0;
106         struct drbd_conf *mdev = e->mdev;
107         sector_t e_sector;
108         int do_wake;
109         u64 block_id;
110         int do_al_complete_io;
111
112         /* after we moved e to done_ee,
113          * we may no longer access it,
114          * it may be freed/reused already!
115          * (as soon as we release the req_lock) */
116         e_sector = e->i.sector;
117         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
118         block_id = e->block_id;
119
120         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
121         mdev->writ_cnt += e->i.size >> 9;
122         list_del(&e->w.list); /* has been on active_ee or sync_ee */
123         list_add_tail(&e->w.list, &mdev->done_ee);
124
125         /*
126          * Do not remove from the write_requests tree here: we did not send the
127          * Ack yet and did not wake possibly waiting conflicting requests.
128          * Removed from the tree from "drbd_process_done_ee" within the
129          * appropriate w.cb (e_end_block/e_end_resync_block) or from
130          * _drbd_clear_done_ee.
131          */
132
133         do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
134
135         if (test_bit(__EE_WAS_ERROR, &e->flags))
136                 __drbd_chk_io_error(mdev, false);
137         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
138
139         if (block_id == ID_SYNCER)
140                 drbd_rs_complete_io(mdev, e_sector);
141
142         if (do_wake)
143                 wake_up(&mdev->ee_wait);
144
145         if (do_al_complete_io)
146                 drbd_al_complete_io(mdev, e_sector);
147
148         wake_asender(mdev);
149         put_ldev(mdev);
150 }
151
152 /* writes on behalf of the partner, or resync writes,
153  * "submitted" by the receiver.
154  */
155 void drbd_endio_sec(struct bio *bio, int error)
156 {
157         struct drbd_peer_request *e = bio->bi_private;
158         struct drbd_conf *mdev = e->mdev;
159         int uptodate = bio_flagged(bio, BIO_UPTODATE);
160         int is_write = bio_data_dir(bio) == WRITE;
161
162         if (error && __ratelimit(&drbd_ratelimit_state))
163                 dev_warn(DEV, "%s: error=%d s=%llus\n",
164                                 is_write ? "write" : "read", error,
165                                 (unsigned long long)e->i.sector);
166         if (!error && !uptodate) {
167                 if (__ratelimit(&drbd_ratelimit_state))
168                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
169                                         is_write ? "write" : "read",
170                                         (unsigned long long)e->i.sector);
171                 /* strange behavior of some lower level drivers...
172                  * fail the request by clearing the uptodate flag,
173                  * but do not return any error?! */
174                 error = -EIO;
175         }
176
177         if (error)
178                 set_bit(__EE_WAS_ERROR, &e->flags);
179
180         bio_put(bio); /* no need for the bio anymore */
181         if (atomic_dec_and_test(&e->pending_bios)) {
182                 if (is_write)
183                         drbd_endio_write_sec_final(e);
184                 else
185                         drbd_endio_read_sec_final(e);
186         }
187 }
188
189 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
190  */
191 void drbd_endio_pri(struct bio *bio, int error)
192 {
193         unsigned long flags;
194         struct drbd_request *req = bio->bi_private;
195         struct drbd_conf *mdev = req->mdev;
196         struct bio_and_error m;
197         enum drbd_req_event what;
198         int uptodate = bio_flagged(bio, BIO_UPTODATE);
199
200         if (!error && !uptodate) {
201                 dev_warn(DEV, "p %s: setting error to -EIO\n",
202                          bio_data_dir(bio) == WRITE ? "write" : "read");
203                 /* strange behavior of some lower level drivers...
204                  * fail the request by clearing the uptodate flag,
205                  * but do not return any error?! */
206                 error = -EIO;
207         }
208
209         /* to avoid recursion in __req_mod */
210         if (unlikely(error)) {
211                 what = (bio_data_dir(bio) == WRITE)
212                         ? WRITE_COMPLETED_WITH_ERROR
213                         : (bio_rw(bio) == READ)
214                           ? READ_COMPLETED_WITH_ERROR
215                           : READ_AHEAD_COMPLETED_WITH_ERROR;
216         } else
217                 what = COMPLETED_OK;
218
219         bio_put(req->private_bio);
220         req->private_bio = ERR_PTR(error);
221
222         /* not req_mod(), we need irqsave here! */
223         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
224         __req_mod(req, what, &m);
225         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
226
227         if (m.bio)
228                 complete_master_bio(mdev, &m);
229 }
230
231 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
232 {
233         struct drbd_request *req = container_of(w, struct drbd_request, w);
234
235         /* We should not detach for read io-error,
236          * but try to WRITE the P_DATA_REPLY to the failed location,
237          * to give the disk the chance to relocate that block */
238
239         spin_lock_irq(&mdev->tconn->req_lock);
240         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
241                 _req_mod(req, READ_RETRY_REMOTE_CANCELED);
242                 spin_unlock_irq(&mdev->tconn->req_lock);
243                 return 1;
244         }
245         spin_unlock_irq(&mdev->tconn->req_lock);
246
247         return w_send_read_req(mdev, w, 0);
248 }
249
250 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
251                   struct drbd_peer_request *e, void *digest)
252 {
253         struct hash_desc desc;
254         struct scatterlist sg;
255         struct page *page = e->pages;
256         struct page *tmp;
257         unsigned len;
258
259         desc.tfm = tfm;
260         desc.flags = 0;
261
262         sg_init_table(&sg, 1);
263         crypto_hash_init(&desc);
264
265         while ((tmp = page_chain_next(page))) {
266                 /* all but the last page will be fully used */
267                 sg_set_page(&sg, page, PAGE_SIZE, 0);
268                 crypto_hash_update(&desc, &sg, sg.length);
269                 page = tmp;
270         }
271         /* and now the last, possibly only partially used page */
272         len = e->i.size & (PAGE_SIZE - 1);
273         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
274         crypto_hash_update(&desc, &sg, sg.length);
275         crypto_hash_final(&desc, digest);
276 }
277
278 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
279 {
280         struct hash_desc desc;
281         struct scatterlist sg;
282         struct bio_vec *bvec;
283         int i;
284
285         desc.tfm = tfm;
286         desc.flags = 0;
287
288         sg_init_table(&sg, 1);
289         crypto_hash_init(&desc);
290
291         __bio_for_each_segment(bvec, bio, i, 0) {
292                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
293                 crypto_hash_update(&desc, &sg, sg.length);
294         }
295         crypto_hash_final(&desc, digest);
296 }
297
298 /* TODO merge common code with w_e_end_ov_req */
299 int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
300 {
301         struct drbd_peer_request *e = container_of(w, struct drbd_peer_request, w);
302         int digest_size;
303         void *digest;
304         int ok = 1;
305
306         if (unlikely(cancel))
307                 goto out;
308
309         if (likely((e->flags & EE_WAS_ERROR) != 0))
310                 goto out;
311
312         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
313         digest = kmalloc(digest_size, GFP_NOIO);
314         if (digest) {
315                 sector_t sector = e->i.sector;
316                 unsigned int size = e->i.size;
317                 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
318                 /* Free e and pages before send.
319                  * In case we block on congestion, we could otherwise run into
320                  * some distributed deadlock, if the other side blocks on
321                  * congestion as well, because our receiver blocks in
322                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
323                 drbd_free_ee(mdev, e);
324                 e = NULL;
325                 inc_rs_pending(mdev);
326                 ok = drbd_send_drequest_csum(mdev, sector, size,
327                                              digest, digest_size,
328                                              P_CSUM_RS_REQUEST);
329                 kfree(digest);
330         } else {
331                 dev_err(DEV, "kmalloc() of digest failed.\n");
332                 ok = 0;
333         }
334
335 out:
336         if (e)
337                 drbd_free_ee(mdev, e);
338
339         if (unlikely(!ok))
340                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
341         return ok;
342 }
343
344 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
345
346 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
347 {
348         struct drbd_peer_request *e;
349
350         if (!get_ldev(mdev))
351                 return -EIO;
352
353         if (drbd_rs_should_slow_down(mdev, sector))
354                 goto defer;
355
356         /* GFP_TRY, because if there is no memory available right now, this may
357          * be rescheduled for later. It is "only" background resync, after all. */
358         e = drbd_alloc_ee(mdev, ID_SYNCER /* unused */, sector, size, GFP_TRY);
359         if (!e)
360                 goto defer;
361
362         e->w.cb = w_e_send_csum;
363         spin_lock_irq(&mdev->tconn->req_lock);
364         list_add(&e->w.list, &mdev->read_ee);
365         spin_unlock_irq(&mdev->tconn->req_lock);
366
367         atomic_add(size >> 9, &mdev->rs_sect_ev);
368         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
369                 return 0;
370
371         /* If it failed because of ENOMEM, retry should help.  If it failed
372          * because bio_add_page failed (probably broken lower level driver),
373          * retry may or may not help.
374          * If it does not, you may need to force disconnect. */
375         spin_lock_irq(&mdev->tconn->req_lock);
376         list_del(&e->w.list);
377         spin_unlock_irq(&mdev->tconn->req_lock);
378
379         drbd_free_ee(mdev, e);
380 defer:
381         put_ldev(mdev);
382         return -EAGAIN;
383 }
384
385 int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
386 {
387         switch (mdev->state.conn) {
388         case C_VERIFY_S:
389                 w_make_ov_request(mdev, w, cancel);
390                 break;
391         case C_SYNC_TARGET:
392                 w_make_resync_request(mdev, w, cancel);
393                 break;
394         }
395
396         return 1;
397 }
398
399 void resync_timer_fn(unsigned long data)
400 {
401         struct drbd_conf *mdev = (struct drbd_conf *) data;
402
403         if (list_empty(&mdev->resync_work.list))
404                 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
405 }
406
407 static void fifo_set(struct fifo_buffer *fb, int value)
408 {
409         int i;
410
411         for (i = 0; i < fb->size; i++)
412                 fb->values[i] = value;
413 }
414
415 static int fifo_push(struct fifo_buffer *fb, int value)
416 {
417         int ov;
418
419         ov = fb->values[fb->head_index];
420         fb->values[fb->head_index++] = value;
421
422         if (fb->head_index >= fb->size)
423                 fb->head_index = 0;
424
425         return ov;
426 }
427
428 static void fifo_add_val(struct fifo_buffer *fb, int value)
429 {
430         int i;
431
432         for (i = 0; i < fb->size; i++)
433                 fb->values[i] += value;
434 }
435
436 static int drbd_rs_controller(struct drbd_conf *mdev)
437 {
438         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
439         unsigned int want;     /* The number of sectors we want in the proxy */
440         int req_sect; /* Number of sectors to request in this turn */
441         int correction; /* Number of sectors more we need in the proxy*/
442         int cps; /* correction per invocation of drbd_rs_controller() */
443         int steps; /* Number of time steps to plan ahead */
444         int curr_corr;
445         int max_sect;
446
447         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
448         mdev->rs_in_flight -= sect_in;
449
450         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
451
452         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
453
454         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
455                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
456         } else { /* normal path */
457                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
458                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
459         }
460
461         correction = want - mdev->rs_in_flight - mdev->rs_planed;
462
463         /* Plan ahead */
464         cps = correction / steps;
465         fifo_add_val(&mdev->rs_plan_s, cps);
466         mdev->rs_planed += cps * steps;
467
468         /* What we do in this step */
469         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
470         spin_unlock(&mdev->peer_seq_lock);
471         mdev->rs_planed -= curr_corr;
472
473         req_sect = sect_in + curr_corr;
474         if (req_sect < 0)
475                 req_sect = 0;
476
477         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
478         if (req_sect > max_sect)
479                 req_sect = max_sect;
480
481         /*
482         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
483                  sect_in, mdev->rs_in_flight, want, correction,
484                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
485         */
486
487         return req_sect;
488 }
489
490 static int drbd_rs_number_requests(struct drbd_conf *mdev)
491 {
492         int number;
493         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
494                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
495                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
496         } else {
497                 mdev->c_sync_rate = mdev->sync_conf.rate;
498                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
499         }
500
501         /* ignore the amount of pending requests, the resync controller should
502          * throttle down to incoming reply rate soon enough anyways. */
503         return number;
504 }
505
506 static int w_make_resync_request(struct drbd_conf *mdev,
507                                  struct drbd_work *w, int cancel)
508 {
509         unsigned long bit;
510         sector_t sector;
511         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
512         int max_bio_size;
513         int number, rollback_i, size;
514         int align, queued, sndbuf;
515         int i = 0;
516
517         if (unlikely(cancel))
518                 return 1;
519
520         if (mdev->rs_total == 0) {
521                 /* empty resync? */
522                 drbd_resync_finished(mdev);
523                 return 1;
524         }
525
526         if (!get_ldev(mdev)) {
527                 /* Since we only need to access mdev->rsync a
528                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
529                    to continue resync with a broken disk makes no sense at
530                    all */
531                 dev_err(DEV, "Disk broke down during resync!\n");
532                 return 1;
533         }
534
535         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
536         number = drbd_rs_number_requests(mdev);
537         if (number == 0)
538                 goto requeue;
539
540         for (i = 0; i < number; i++) {
541                 /* Stop generating RS requests, when half of the send buffer is filled */
542                 mutex_lock(&mdev->tconn->data.mutex);
543                 if (mdev->tconn->data.socket) {
544                         queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
545                         sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
546                 } else {
547                         queued = 1;
548                         sndbuf = 0;
549                 }
550                 mutex_unlock(&mdev->tconn->data.mutex);
551                 if (queued > sndbuf / 2)
552                         goto requeue;
553
554 next_sector:
555                 size = BM_BLOCK_SIZE;
556                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
557
558                 if (bit == DRBD_END_OF_BITMAP) {
559                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
560                         put_ldev(mdev);
561                         return 1;
562                 }
563
564                 sector = BM_BIT_TO_SECT(bit);
565
566                 if (drbd_rs_should_slow_down(mdev, sector) ||
567                     drbd_try_rs_begin_io(mdev, sector)) {
568                         mdev->bm_resync_fo = bit;
569                         goto requeue;
570                 }
571                 mdev->bm_resync_fo = bit + 1;
572
573                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
574                         drbd_rs_complete_io(mdev, sector);
575                         goto next_sector;
576                 }
577
578 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
579                 /* try to find some adjacent bits.
580                  * we stop if we have already the maximum req size.
581                  *
582                  * Additionally always align bigger requests, in order to
583                  * be prepared for all stripe sizes of software RAIDs.
584                  */
585                 align = 1;
586                 rollback_i = i;
587                 for (;;) {
588                         if (size + BM_BLOCK_SIZE > max_bio_size)
589                                 break;
590
591                         /* Be always aligned */
592                         if (sector & ((1<<(align+3))-1))
593                                 break;
594
595                         /* do not cross extent boundaries */
596                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
597                                 break;
598                         /* now, is it actually dirty, after all?
599                          * caution, drbd_bm_test_bit is tri-state for some
600                          * obscure reason; ( b == 0 ) would get the out-of-band
601                          * only accidentally right because of the "oddly sized"
602                          * adjustment below */
603                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
604                                 break;
605                         bit++;
606                         size += BM_BLOCK_SIZE;
607                         if ((BM_BLOCK_SIZE << align) <= size)
608                                 align++;
609                         i++;
610                 }
611                 /* if we merged some,
612                  * reset the offset to start the next drbd_bm_find_next from */
613                 if (size > BM_BLOCK_SIZE)
614                         mdev->bm_resync_fo = bit + 1;
615 #endif
616
617                 /* adjust very last sectors, in case we are oddly sized */
618                 if (sector + (size>>9) > capacity)
619                         size = (capacity-sector)<<9;
620                 if (mdev->tconn->agreed_pro_version >= 89 && mdev->csums_tfm) {
621                         switch (read_for_csum(mdev, sector, size)) {
622                         case -EIO: /* Disk failure */
623                                 put_ldev(mdev);
624                                 return 0;
625                         case -EAGAIN: /* allocation failed, or ldev busy */
626                                 drbd_rs_complete_io(mdev, sector);
627                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
628                                 i = rollback_i;
629                                 goto requeue;
630                         case 0:
631                                 /* everything ok */
632                                 break;
633                         default:
634                                 BUG();
635                         }
636                 } else {
637                         inc_rs_pending(mdev);
638                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
639                                                sector, size, ID_SYNCER)) {
640                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
641                                 dec_rs_pending(mdev);
642                                 put_ldev(mdev);
643                                 return 0;
644                         }
645                 }
646         }
647
648         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
649                 /* last syncer _request_ was sent,
650                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
651                  * next sync group will resume), as soon as we receive the last
652                  * resync data block, and the last bit is cleared.
653                  * until then resync "work" is "inactive" ...
654                  */
655                 put_ldev(mdev);
656                 return 1;
657         }
658
659  requeue:
660         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
661         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
662         put_ldev(mdev);
663         return 1;
664 }
665
666 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
667 {
668         int number, i, size;
669         sector_t sector;
670         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
671
672         if (unlikely(cancel))
673                 return 1;
674
675         number = drbd_rs_number_requests(mdev);
676
677         sector = mdev->ov_position;
678         for (i = 0; i < number; i++) {
679                 if (sector >= capacity) {
680                         return 1;
681                 }
682
683                 size = BM_BLOCK_SIZE;
684
685                 if (drbd_rs_should_slow_down(mdev, sector) ||
686                     drbd_try_rs_begin_io(mdev, sector)) {
687                         mdev->ov_position = sector;
688                         goto requeue;
689                 }
690
691                 if (sector + (size>>9) > capacity)
692                         size = (capacity-sector)<<9;
693
694                 inc_rs_pending(mdev);
695                 if (!drbd_send_ov_request(mdev, sector, size)) {
696                         dec_rs_pending(mdev);
697                         return 0;
698                 }
699                 sector += BM_SECT_PER_BIT;
700         }
701         mdev->ov_position = sector;
702
703  requeue:
704         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
705         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
706         return 1;
707 }
708
709
710 void start_resync_timer_fn(unsigned long data)
711 {
712         struct drbd_conf *mdev = (struct drbd_conf *) data;
713
714         drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
715 }
716
717 int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
718 {
719         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
720                 dev_warn(DEV, "w_start_resync later...\n");
721                 mdev->start_resync_timer.expires = jiffies + HZ/10;
722                 add_timer(&mdev->start_resync_timer);
723                 return 1;
724         }
725
726         drbd_start_resync(mdev, C_SYNC_SOURCE);
727         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
728         return 1;
729 }
730
731 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
732 {
733         kfree(w);
734         ov_oos_print(mdev);
735         drbd_resync_finished(mdev);
736
737         return 1;
738 }
739
740 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
741 {
742         kfree(w);
743
744         drbd_resync_finished(mdev);
745
746         return 1;
747 }
748
749 static void ping_peer(struct drbd_conf *mdev)
750 {
751         clear_bit(GOT_PING_ACK, &mdev->flags);
752         request_ping(mdev);
753         wait_event(mdev->misc_wait,
754                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
755 }
756
757 int drbd_resync_finished(struct drbd_conf *mdev)
758 {
759         unsigned long db, dt, dbdt;
760         unsigned long n_oos;
761         union drbd_state os, ns;
762         struct drbd_work *w;
763         char *khelper_cmd = NULL;
764         int verify_done = 0;
765
766         /* Remove all elements from the resync LRU. Since future actions
767          * might set bits in the (main) bitmap, then the entries in the
768          * resync LRU would be wrong. */
769         if (drbd_rs_del_all(mdev)) {
770                 /* In case this is not possible now, most probably because
771                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
772                  * queue (or even the read operations for those packets
773                  * is not finished by now).   Retry in 100ms. */
774
775                 schedule_timeout_interruptible(HZ / 10);
776                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
777                 if (w) {
778                         w->cb = w_resync_finished;
779                         drbd_queue_work(&mdev->tconn->data.work, w);
780                         return 1;
781                 }
782                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
783         }
784
785         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
786         if (dt <= 0)
787                 dt = 1;
788         db = mdev->rs_total;
789         dbdt = Bit2KB(db/dt);
790         mdev->rs_paused /= HZ;
791
792         if (!get_ldev(mdev))
793                 goto out;
794
795         ping_peer(mdev);
796
797         spin_lock_irq(&mdev->tconn->req_lock);
798         os = mdev->state;
799
800         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
801
802         /* This protects us against multiple calls (that can happen in the presence
803            of application IO), and against connectivity loss just before we arrive here. */
804         if (os.conn <= C_CONNECTED)
805                 goto out_unlock;
806
807         ns = os;
808         ns.conn = C_CONNECTED;
809
810         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
811              verify_done ? "Online verify " : "Resync",
812              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
813
814         n_oos = drbd_bm_total_weight(mdev);
815
816         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
817                 if (n_oos) {
818                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
819                               n_oos, Bit2KB(1));
820                         khelper_cmd = "out-of-sync";
821                 }
822         } else {
823                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
824
825                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
826                         khelper_cmd = "after-resync-target";
827
828                 if (mdev->csums_tfm && mdev->rs_total) {
829                         const unsigned long s = mdev->rs_same_csum;
830                         const unsigned long t = mdev->rs_total;
831                         const int ratio =
832                                 (t == 0)     ? 0 :
833                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
834                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
835                              "transferred %luK total %luK\n",
836                              ratio,
837                              Bit2KB(mdev->rs_same_csum),
838                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
839                              Bit2KB(mdev->rs_total));
840                 }
841         }
842
843         if (mdev->rs_failed) {
844                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
845
846                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
847                         ns.disk = D_INCONSISTENT;
848                         ns.pdsk = D_UP_TO_DATE;
849                 } else {
850                         ns.disk = D_UP_TO_DATE;
851                         ns.pdsk = D_INCONSISTENT;
852                 }
853         } else {
854                 ns.disk = D_UP_TO_DATE;
855                 ns.pdsk = D_UP_TO_DATE;
856
857                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
858                         if (mdev->p_uuid) {
859                                 int i;
860                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
861                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
862                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
863                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
864                         } else {
865                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
866                         }
867                 }
868
869                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
870                         /* for verify runs, we don't update uuids here,
871                          * so there would be nothing to report. */
872                         drbd_uuid_set_bm(mdev, 0UL);
873                         drbd_print_uuids(mdev, "updated UUIDs");
874                         if (mdev->p_uuid) {
875                                 /* Now the two UUID sets are equal, update what we
876                                  * know of the peer. */
877                                 int i;
878                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
879                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
880                         }
881                 }
882         }
883
884         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
885 out_unlock:
886         spin_unlock_irq(&mdev->tconn->req_lock);
887         put_ldev(mdev);
888 out:
889         mdev->rs_total  = 0;
890         mdev->rs_failed = 0;
891         mdev->rs_paused = 0;
892         if (verify_done)
893                 mdev->ov_start_sector = 0;
894
895         drbd_md_sync(mdev);
896
897         if (khelper_cmd)
898                 drbd_khelper(mdev, khelper_cmd);
899
900         return 1;
901 }
902
903 /* helper */
904 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *e)
905 {
906         if (drbd_ee_has_active_page(e)) {
907                 /* This might happen if sendpage() has not finished */
908                 int i = (e->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
909                 atomic_add(i, &mdev->pp_in_use_by_net);
910                 atomic_sub(i, &mdev->pp_in_use);
911                 spin_lock_irq(&mdev->tconn->req_lock);
912                 list_add_tail(&e->w.list, &mdev->net_ee);
913                 spin_unlock_irq(&mdev->tconn->req_lock);
914                 wake_up(&drbd_pp_wait);
915         } else
916                 drbd_free_ee(mdev, e);
917 }
918
919 /**
920  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
921  * @mdev:       DRBD device.
922  * @w:          work object.
923  * @cancel:     The connection will be closed anyways
924  */
925 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
926 {
927         struct drbd_peer_request *e = container_of(w, struct drbd_peer_request, w);
928         int ok;
929
930         if (unlikely(cancel)) {
931                 drbd_free_ee(mdev, e);
932                 dec_unacked(mdev);
933                 return 1;
934         }
935
936         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
937                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
938         } else {
939                 if (__ratelimit(&drbd_ratelimit_state))
940                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
941                             (unsigned long long)e->i.sector);
942
943                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
944         }
945
946         dec_unacked(mdev);
947
948         move_to_net_ee_or_free(mdev, e);
949
950         if (unlikely(!ok))
951                 dev_err(DEV, "drbd_send_block() failed\n");
952         return ok;
953 }
954
955 /**
956  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
957  * @mdev:       DRBD device.
958  * @w:          work object.
959  * @cancel:     The connection will be closed anyways
960  */
961 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
962 {
963         struct drbd_peer_request *e = container_of(w, struct drbd_peer_request, w);
964         int ok;
965
966         if (unlikely(cancel)) {
967                 drbd_free_ee(mdev, e);
968                 dec_unacked(mdev);
969                 return 1;
970         }
971
972         if (get_ldev_if_state(mdev, D_FAILED)) {
973                 drbd_rs_complete_io(mdev, e->i.sector);
974                 put_ldev(mdev);
975         }
976
977         if (mdev->state.conn == C_AHEAD) {
978                 ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
979         } else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
980                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
981                         inc_rs_pending(mdev);
982                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
983                 } else {
984                         if (__ratelimit(&drbd_ratelimit_state))
985                                 dev_err(DEV, "Not sending RSDataReply, "
986                                     "partner DISKLESS!\n");
987                         ok = 1;
988                 }
989         } else {
990                 if (__ratelimit(&drbd_ratelimit_state))
991                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
992                             (unsigned long long)e->i.sector);
993
994                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
995
996                 /* update resync data with failure */
997                 drbd_rs_failed_io(mdev, e->i.sector, e->i.size);
998         }
999
1000         dec_unacked(mdev);
1001
1002         move_to_net_ee_or_free(mdev, e);
1003
1004         if (unlikely(!ok))
1005                 dev_err(DEV, "drbd_send_block() failed\n");
1006         return ok;
1007 }
1008
1009 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1010 {
1011         struct drbd_peer_request *e = container_of(w, struct drbd_peer_request, w);
1012         struct digest_info *di;
1013         int digest_size;
1014         void *digest = NULL;
1015         int ok, eq = 0;
1016
1017         if (unlikely(cancel)) {
1018                 drbd_free_ee(mdev, e);
1019                 dec_unacked(mdev);
1020                 return 1;
1021         }
1022
1023         if (get_ldev(mdev)) {
1024                 drbd_rs_complete_io(mdev, e->i.sector);
1025                 put_ldev(mdev);
1026         }
1027
1028         di = e->digest;
1029
1030         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1031                 /* quick hack to try to avoid a race against reconfiguration.
1032                  * a real fix would be much more involved,
1033                  * introducing more locking mechanisms */
1034                 if (mdev->csums_tfm) {
1035                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1036                         D_ASSERT(digest_size == di->digest_size);
1037                         digest = kmalloc(digest_size, GFP_NOIO);
1038                 }
1039                 if (digest) {
1040                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1041                         eq = !memcmp(digest, di->digest, digest_size);
1042                         kfree(digest);
1043                 }
1044
1045                 if (eq) {
1046                         drbd_set_in_sync(mdev, e->i.sector, e->i.size);
1047                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1048                         mdev->rs_same_csum += e->i.size >> BM_BLOCK_SHIFT;
1049                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1050                 } else {
1051                         inc_rs_pending(mdev);
1052                         e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1053                         e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1054                         kfree(di);
1055                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1056                 }
1057         } else {
1058                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1059                 if (__ratelimit(&drbd_ratelimit_state))
1060                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1061         }
1062
1063         dec_unacked(mdev);
1064         move_to_net_ee_or_free(mdev, e);
1065
1066         if (unlikely(!ok))
1067                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1068         return ok;
1069 }
1070
1071 /* TODO merge common code with w_e_send_csum */
1072 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1073 {
1074         struct drbd_peer_request *e = container_of(w, struct drbd_peer_request, w);
1075         sector_t sector = e->i.sector;
1076         unsigned int size = e->i.size;
1077         int digest_size;
1078         void *digest;
1079         int ok = 1;
1080
1081         if (unlikely(cancel))
1082                 goto out;
1083
1084         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1085         digest = kmalloc(digest_size, GFP_NOIO);
1086         if (!digest) {
1087                 ok = 0; /* terminate the connection in case the allocation failed */
1088                 goto out;
1089         }
1090
1091         if (likely(!(e->flags & EE_WAS_ERROR)))
1092                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1093         else
1094                 memset(digest, 0, digest_size);
1095
1096         /* Free e and pages before send.
1097          * In case we block on congestion, we could otherwise run into
1098          * some distributed deadlock, if the other side blocks on
1099          * congestion as well, because our receiver blocks in
1100          * drbd_pp_alloc due to pp_in_use > max_buffers. */
1101         drbd_free_ee(mdev, e);
1102         e = NULL;
1103         inc_rs_pending(mdev);
1104         ok = drbd_send_drequest_csum(mdev, sector, size,
1105                                      digest, digest_size,
1106                                      P_OV_REPLY);
1107         if (!ok)
1108                 dec_rs_pending(mdev);
1109         kfree(digest);
1110
1111 out:
1112         if (e)
1113                 drbd_free_ee(mdev, e);
1114         dec_unacked(mdev);
1115         return ok;
1116 }
1117
1118 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1119 {
1120         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1121                 mdev->ov_last_oos_size += size>>9;
1122         } else {
1123                 mdev->ov_last_oos_start = sector;
1124                 mdev->ov_last_oos_size = size>>9;
1125         }
1126         drbd_set_out_of_sync(mdev, sector, size);
1127 }
1128
1129 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1130 {
1131         struct drbd_peer_request *e = container_of(w, struct drbd_peer_request, w);
1132         struct digest_info *di;
1133         void *digest;
1134         sector_t sector = e->i.sector;
1135         unsigned int size = e->i.size;
1136         int digest_size;
1137         int ok, eq = 0;
1138
1139         if (unlikely(cancel)) {
1140                 drbd_free_ee(mdev, e);
1141                 dec_unacked(mdev);
1142                 return 1;
1143         }
1144
1145         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1146          * the resync lru has been cleaned up already */
1147         if (get_ldev(mdev)) {
1148                 drbd_rs_complete_io(mdev, e->i.sector);
1149                 put_ldev(mdev);
1150         }
1151
1152         di = e->digest;
1153
1154         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1155                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1156                 digest = kmalloc(digest_size, GFP_NOIO);
1157                 if (digest) {
1158                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1159
1160                         D_ASSERT(digest_size == di->digest_size);
1161                         eq = !memcmp(digest, di->digest, digest_size);
1162                         kfree(digest);
1163                 }
1164         }
1165
1166                 /* Free e and pages before send.
1167                  * In case we block on congestion, we could otherwise run into
1168                  * some distributed deadlock, if the other side blocks on
1169                  * congestion as well, because our receiver blocks in
1170                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
1171         drbd_free_ee(mdev, e);
1172         if (!eq)
1173                 drbd_ov_oos_found(mdev, sector, size);
1174         else
1175                 ov_oos_print(mdev);
1176
1177         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1178                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1179
1180         dec_unacked(mdev);
1181
1182         --mdev->ov_left;
1183
1184         /* let's advance progress step marks only for every other megabyte */
1185         if ((mdev->ov_left & 0x200) == 0x200)
1186                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1187
1188         if (mdev->ov_left == 0) {
1189                 ov_oos_print(mdev);
1190                 drbd_resync_finished(mdev);
1191         }
1192
1193         return ok;
1194 }
1195
1196 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1197 {
1198         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1199         complete(&b->done);
1200         return 1;
1201 }
1202
1203 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1204 {
1205         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1206         struct p_barrier *p = &mdev->tconn->data.sbuf.barrier;
1207         int ok = 1;
1208
1209         /* really avoid racing with tl_clear.  w.cb may have been referenced
1210          * just before it was reassigned and re-queued, so double check that.
1211          * actually, this race was harmless, since we only try to send the
1212          * barrier packet here, and otherwise do nothing with the object.
1213          * but compare with the head of w_clear_epoch */
1214         spin_lock_irq(&mdev->tconn->req_lock);
1215         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1216                 cancel = 1;
1217         spin_unlock_irq(&mdev->tconn->req_lock);
1218         if (cancel)
1219                 return 1;
1220
1221         if (!drbd_get_data_sock(mdev))
1222                 return 0;
1223         p->barrier = b->br_number;
1224         /* inc_ap_pending was done where this was queued.
1225          * dec_ap_pending will be done in got_BarrierAck
1226          * or (on connection loss) in w_clear_epoch.  */
1227         ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BARRIER,
1228                             &p->head, sizeof(*p), 0);
1229         drbd_put_data_sock(mdev);
1230
1231         return ok;
1232 }
1233
1234 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1235 {
1236         if (cancel)
1237                 return 1;
1238         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1239 }
1240
1241 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1242 {
1243         struct drbd_request *req = container_of(w, struct drbd_request, w);
1244         int ok;
1245
1246         if (unlikely(cancel)) {
1247                 req_mod(req, SEND_CANCELED);
1248                 return 1;
1249         }
1250
1251         ok = drbd_send_oos(mdev, req);
1252         req_mod(req, OOS_HANDED_TO_NETWORK);
1253
1254         return ok;
1255 }
1256
1257 /**
1258  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1259  * @mdev:       DRBD device.
1260  * @w:          work object.
1261  * @cancel:     The connection will be closed anyways
1262  */
1263 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1264 {
1265         struct drbd_request *req = container_of(w, struct drbd_request, w);
1266         int ok;
1267
1268         if (unlikely(cancel)) {
1269                 req_mod(req, SEND_CANCELED);
1270                 return 1;
1271         }
1272
1273         ok = drbd_send_dblock(mdev, req);
1274         req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1275
1276         return ok;
1277 }
1278
1279 /**
1280  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1281  * @mdev:       DRBD device.
1282  * @w:          work object.
1283  * @cancel:     The connection will be closed anyways
1284  */
1285 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1286 {
1287         struct drbd_request *req = container_of(w, struct drbd_request, w);
1288         int ok;
1289
1290         if (unlikely(cancel)) {
1291                 req_mod(req, SEND_CANCELED);
1292                 return 1;
1293         }
1294
1295         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1296                                 (unsigned long)req);
1297
1298         if (!ok) {
1299                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1300                  * so this is probably redundant */
1301                 if (mdev->state.conn >= C_CONNECTED)
1302                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1303         }
1304         req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1305
1306         return ok;
1307 }
1308
1309 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1310 {
1311         struct drbd_request *req = container_of(w, struct drbd_request, w);
1312
1313         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1314                 drbd_al_begin_io(mdev, req->i.sector);
1315         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1316            theoretically. Practically it can not deadlock, since this is
1317            only used when unfreezing IOs. All the extents of the requests
1318            that made it into the TL are already active */
1319
1320         drbd_req_make_private_bio(req, req->master_bio);
1321         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1322         generic_make_request(req->private_bio);
1323
1324         return 1;
1325 }
1326
1327 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1328 {
1329         struct drbd_conf *odev = mdev;
1330
1331         while (1) {
1332                 if (odev->sync_conf.after == -1)
1333                         return 1;
1334                 odev = minor_to_mdev(odev->sync_conf.after);
1335                 if (!expect(odev))
1336                         return 1;
1337                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1338                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1339                     odev->state.aftr_isp || odev->state.peer_isp ||
1340                     odev->state.user_isp)
1341                         return 0;
1342         }
1343 }
1344
1345 /**
1346  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1347  * @mdev:       DRBD device.
1348  *
1349  * Called from process context only (admin command and after_state_ch).
1350  */
1351 static int _drbd_pause_after(struct drbd_conf *mdev)
1352 {
1353         struct drbd_conf *odev;
1354         int i, rv = 0;
1355
1356         for (i = 0; i < minor_count; i++) {
1357                 odev = minor_to_mdev(i);
1358                 if (!odev)
1359                         continue;
1360                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1361                         continue;
1362                 if (!_drbd_may_sync_now(odev))
1363                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1364                                != SS_NOTHING_TO_DO);
1365         }
1366
1367         return rv;
1368 }
1369
1370 /**
1371  * _drbd_resume_next() - Resume resync on all devices that may resync now
1372  * @mdev:       DRBD device.
1373  *
1374  * Called from process context only (admin command and worker).
1375  */
1376 static int _drbd_resume_next(struct drbd_conf *mdev)
1377 {
1378         struct drbd_conf *odev;
1379         int i, rv = 0;
1380
1381         for (i = 0; i < minor_count; i++) {
1382                 odev = minor_to_mdev(i);
1383                 if (!odev)
1384                         continue;
1385                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1386                         continue;
1387                 if (odev->state.aftr_isp) {
1388                         if (_drbd_may_sync_now(odev))
1389                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1390                                                         CS_HARD, NULL)
1391                                        != SS_NOTHING_TO_DO) ;
1392                 }
1393         }
1394         return rv;
1395 }
1396
1397 void resume_next_sg(struct drbd_conf *mdev)
1398 {
1399         write_lock_irq(&global_state_lock);
1400         _drbd_resume_next(mdev);
1401         write_unlock_irq(&global_state_lock);
1402 }
1403
1404 void suspend_other_sg(struct drbd_conf *mdev)
1405 {
1406         write_lock_irq(&global_state_lock);
1407         _drbd_pause_after(mdev);
1408         write_unlock_irq(&global_state_lock);
1409 }
1410
1411 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1412 {
1413         struct drbd_conf *odev;
1414
1415         if (o_minor == -1)
1416                 return NO_ERROR;
1417         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1418                 return ERR_SYNC_AFTER;
1419
1420         /* check for loops */
1421         odev = minor_to_mdev(o_minor);
1422         while (1) {
1423                 if (odev == mdev)
1424                         return ERR_SYNC_AFTER_CYCLE;
1425
1426                 /* dependency chain ends here, no cycles. */
1427                 if (odev->sync_conf.after == -1)
1428                         return NO_ERROR;
1429
1430                 /* follow the dependency chain */
1431                 odev = minor_to_mdev(odev->sync_conf.after);
1432         }
1433 }
1434
1435 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1436 {
1437         int changes;
1438         int retcode;
1439
1440         write_lock_irq(&global_state_lock);
1441         retcode = sync_after_error(mdev, na);
1442         if (retcode == NO_ERROR) {
1443                 mdev->sync_conf.after = na;
1444                 do {
1445                         changes  = _drbd_pause_after(mdev);
1446                         changes |= _drbd_resume_next(mdev);
1447                 } while (changes);
1448         }
1449         write_unlock_irq(&global_state_lock);
1450         return retcode;
1451 }
1452
1453 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1454 {
1455         atomic_set(&mdev->rs_sect_in, 0);
1456         atomic_set(&mdev->rs_sect_ev, 0);
1457         mdev->rs_in_flight = 0;
1458         mdev->rs_planed = 0;
1459         spin_lock(&mdev->peer_seq_lock);
1460         fifo_set(&mdev->rs_plan_s, 0);
1461         spin_unlock(&mdev->peer_seq_lock);
1462 }
1463
1464 /**
1465  * drbd_start_resync() - Start the resync process
1466  * @mdev:       DRBD device.
1467  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1468  *
1469  * This function might bring you directly into one of the
1470  * C_PAUSED_SYNC_* states.
1471  */
1472 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1473 {
1474         union drbd_state ns;
1475         int r;
1476
1477         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1478                 dev_err(DEV, "Resync already running!\n");
1479                 return;
1480         }
1481
1482         if (mdev->state.conn < C_AHEAD) {
1483                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1484                 drbd_rs_cancel_all(mdev);
1485                 /* This should be done when we abort the resync. We definitely do not
1486                    want to have this for connections going back and forth between
1487                    Ahead/Behind and SyncSource/SyncTarget */
1488         }
1489
1490         if (side == C_SYNC_TARGET) {
1491                 /* Since application IO was locked out during C_WF_BITMAP_T and
1492                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1493                    we check that we might make the data inconsistent. */
1494                 r = drbd_khelper(mdev, "before-resync-target");
1495                 r = (r >> 8) & 0xff;
1496                 if (r > 0) {
1497                         dev_info(DEV, "before-resync-target handler returned %d, "
1498                              "dropping connection.\n", r);
1499                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1500                         return;
1501                 }
1502         } else /* C_SYNC_SOURCE */ {
1503                 r = drbd_khelper(mdev, "before-resync-source");
1504                 r = (r >> 8) & 0xff;
1505                 if (r > 0) {
1506                         if (r == 3) {
1507                                 dev_info(DEV, "before-resync-source handler returned %d, "
1508                                          "ignoring. Old userland tools?", r);
1509                         } else {
1510                                 dev_info(DEV, "before-resync-source handler returned %d, "
1511                                          "dropping connection.\n", r);
1512                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1513                                 return;
1514                         }
1515                 }
1516         }
1517
1518         drbd_state_lock(mdev);
1519
1520         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1521                 drbd_state_unlock(mdev);
1522                 return;
1523         }
1524
1525         write_lock_irq(&global_state_lock);
1526         ns = mdev->state;
1527
1528         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1529
1530         ns.conn = side;
1531
1532         if (side == C_SYNC_TARGET)
1533                 ns.disk = D_INCONSISTENT;
1534         else /* side == C_SYNC_SOURCE */
1535                 ns.pdsk = D_INCONSISTENT;
1536
1537         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1538         ns = mdev->state;
1539
1540         if (ns.conn < C_CONNECTED)
1541                 r = SS_UNKNOWN_ERROR;
1542
1543         if (r == SS_SUCCESS) {
1544                 unsigned long tw = drbd_bm_total_weight(mdev);
1545                 unsigned long now = jiffies;
1546                 int i;
1547
1548                 mdev->rs_failed    = 0;
1549                 mdev->rs_paused    = 0;
1550                 mdev->rs_same_csum = 0;
1551                 mdev->rs_last_events = 0;
1552                 mdev->rs_last_sect_ev = 0;
1553                 mdev->rs_total     = tw;
1554                 mdev->rs_start     = now;
1555                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1556                         mdev->rs_mark_left[i] = tw;
1557                         mdev->rs_mark_time[i] = now;
1558                 }
1559                 _drbd_pause_after(mdev);
1560         }
1561         write_unlock_irq(&global_state_lock);
1562
1563         if (r == SS_SUCCESS) {
1564                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1565                      drbd_conn_str(ns.conn),
1566                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1567                      (unsigned long) mdev->rs_total);
1568                 if (side == C_SYNC_TARGET)
1569                         mdev->bm_resync_fo = 0;
1570
1571                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1572                  * with w_send_oos, or the sync target will get confused as to
1573                  * how much bits to resync.  We cannot do that always, because for an
1574                  * empty resync and protocol < 95, we need to do it here, as we call
1575                  * drbd_resync_finished from here in that case.
1576                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1577                  * and from after_state_ch otherwise. */
1578                 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1579                         drbd_gen_and_send_sync_uuid(mdev);
1580
1581                 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1582                         /* This still has a race (about when exactly the peers
1583                          * detect connection loss) that can lead to a full sync
1584                          * on next handshake. In 8.3.9 we fixed this with explicit
1585                          * resync-finished notifications, but the fix
1586                          * introduces a protocol change.  Sleeping for some
1587                          * time longer than the ping interval + timeout on the
1588                          * SyncSource, to give the SyncTarget the chance to
1589                          * detect connection loss, then waiting for a ping
1590                          * response (implicit in drbd_resync_finished) reduces
1591                          * the race considerably, but does not solve it. */
1592                         if (side == C_SYNC_SOURCE)
1593                                 schedule_timeout_interruptible(
1594                                         mdev->tconn->net_conf->ping_int * HZ +
1595                                         mdev->tconn->net_conf->ping_timeo*HZ/9);
1596                         drbd_resync_finished(mdev);
1597                 }
1598
1599                 drbd_rs_controller_reset(mdev);
1600                 /* ns.conn may already be != mdev->state.conn,
1601                  * we may have been paused in between, or become paused until
1602                  * the timer triggers.
1603                  * No matter, that is handled in resync_timer_fn() */
1604                 if (ns.conn == C_SYNC_TARGET)
1605                         mod_timer(&mdev->resync_timer, jiffies);
1606
1607                 drbd_md_sync(mdev);
1608         }
1609         put_ldev(mdev);
1610         drbd_state_unlock(mdev);
1611 }
1612
1613 int drbd_worker(struct drbd_thread *thi)
1614 {
1615         struct drbd_conf *mdev = thi->mdev;
1616         struct drbd_work *w = NULL;
1617         LIST_HEAD(work_list);
1618         int intr = 0, i;
1619
1620         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1621
1622         while (get_t_state(thi) == RUNNING) {
1623                 drbd_thread_current_set_cpu(mdev);
1624
1625                 if (down_trylock(&mdev->tconn->data.work.s)) {
1626                         mutex_lock(&mdev->tconn->data.mutex);
1627                         if (mdev->tconn->data.socket && !mdev->tconn->net_conf->no_cork)
1628                                 drbd_tcp_uncork(mdev->tconn->data.socket);
1629                         mutex_unlock(&mdev->tconn->data.mutex);
1630
1631                         intr = down_interruptible(&mdev->tconn->data.work.s);
1632
1633                         mutex_lock(&mdev->tconn->data.mutex);
1634                         if (mdev->tconn->data.socket  && !mdev->tconn->net_conf->no_cork)
1635                                 drbd_tcp_cork(mdev->tconn->data.socket);
1636                         mutex_unlock(&mdev->tconn->data.mutex);
1637                 }
1638
1639                 if (intr) {
1640                         D_ASSERT(intr == -EINTR);
1641                         flush_signals(current);
1642                         if (!expect(get_t_state(thi) != RUNNING))
1643                                 continue;
1644                         break;
1645                 }
1646
1647                 if (get_t_state(thi) != RUNNING)
1648                         break;
1649                 /* With this break, we have done a down() but not consumed
1650                    the entry from the list. The cleanup code takes care of
1651                    this...   */
1652
1653                 w = NULL;
1654                 spin_lock_irq(&mdev->tconn->data.work.q_lock);
1655                 if (!expect(!list_empty(&mdev->tconn->data.work.q))) {
1656                         /* something terribly wrong in our logic.
1657                          * we were able to down() the semaphore,
1658                          * but the list is empty... doh.
1659                          *
1660                          * what is the best thing to do now?
1661                          * try again from scratch, restarting the receiver,
1662                          * asender, whatnot? could break even more ugly,
1663                          * e.g. when we are primary, but no good local data.
1664                          *
1665                          * I'll try to get away just starting over this loop.
1666                          */
1667                         spin_unlock_irq(&mdev->tconn->data.work.q_lock);
1668                         continue;
1669                 }
1670                 w = list_entry(mdev->tconn->data.work.q.next, struct drbd_work, list);
1671                 list_del_init(&w->list);
1672                 spin_unlock_irq(&mdev->tconn->data.work.q_lock);
1673
1674                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1675                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1676                         if (mdev->state.conn >= C_CONNECTED)
1677                                 drbd_force_state(mdev,
1678                                                 NS(conn, C_NETWORK_FAILURE));
1679                 }
1680         }
1681         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1682         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1683
1684         spin_lock_irq(&mdev->tconn->data.work.q_lock);
1685         i = 0;
1686         while (!list_empty(&mdev->tconn->data.work.q)) {
1687                 list_splice_init(&mdev->tconn->data.work.q, &work_list);
1688                 spin_unlock_irq(&mdev->tconn->data.work.q_lock);
1689
1690                 while (!list_empty(&work_list)) {
1691                         w = list_entry(work_list.next, struct drbd_work, list);
1692                         list_del_init(&w->list);
1693                         w->cb(mdev, w, 1);
1694                         i++; /* dead debugging code */
1695                 }
1696
1697                 spin_lock_irq(&mdev->tconn->data.work.q_lock);
1698         }
1699         sema_init(&mdev->tconn->data.work.s, 0);
1700         /* DANGEROUS race: if someone did queue his work within the spinlock,
1701          * but up() ed outside the spinlock, we could get an up() on the
1702          * semaphore without corresponding list entry.
1703          * So don't do that.
1704          */
1705         spin_unlock_irq(&mdev->tconn->data.work.q_lock);
1706
1707         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1708         /* _drbd_set_state only uses stop_nowait.
1709          * wait here for the exiting receiver. */
1710         drbd_thread_stop(&mdev->tconn->receiver);
1711         drbd_mdev_cleanup(mdev);
1712
1713         dev_info(DEV, "worker terminated\n");
1714
1715         clear_bit(DEVICE_DYING, &mdev->flags);
1716         clear_bit(CONFIG_PENDING, &mdev->flags);
1717         wake_up(&mdev->state_wait);
1718
1719         return 0;
1720 }