]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_worker.c
9d1ba8ea68186304bfaf1bf7c5197970867fd72a
[karo-tx-linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42
43
44 /* endio handlers:
45  *   drbd_md_io_complete (defined here)
46  *   drbd_request_endio (defined here)
47  *   drbd_peer_request_endio (defined here)
48  *   bm_async_io_complete (defined in drbd_bitmap.c)
49  *
50  * For all these callbacks, note the following:
51  * The callbacks will be called in irq context by the IDE drivers,
52  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53  * Try to get the locking right :)
54  *
55  */
56
57
58 /* About the global_state_lock
59    Each state transition on an device holds a read lock. In case we have
60    to evaluate the sync after dependencies, we grab a write lock, because
61    we need stable states on all devices for that.  */
62 rwlock_t global_state_lock;
63
64 /* used for synchronous meta data and bitmap IO
65  * submitted by drbd_md_sync_page_io()
66  */
67 void drbd_md_io_complete(struct bio *bio, int error)
68 {
69         struct drbd_md_io *md_io;
70
71         md_io = (struct drbd_md_io *)bio->bi_private;
72         md_io->error = error;
73
74         complete(&md_io->event);
75 }
76
77 /* reads on behalf of the partner,
78  * "submitted" by the receiver
79  */
80 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
81 {
82         unsigned long flags = 0;
83         struct drbd_conf *mdev = peer_req->w.mdev;
84
85         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
86         mdev->read_cnt += peer_req->i.size >> 9;
87         list_del(&peer_req->w.list);
88         if (list_empty(&mdev->read_ee))
89                 wake_up(&mdev->ee_wait);
90         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
91                 __drbd_chk_io_error(mdev, false);
92         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
93
94         drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
95         put_ldev(mdev);
96 }
97
98 /* writes on behalf of the partner, or resync writes,
99  * "submitted" by the receiver, final stage.  */
100 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
101 {
102         unsigned long flags = 0;
103         struct drbd_conf *mdev = peer_req->w.mdev;
104         struct drbd_interval i;
105         int do_wake;
106         u64 block_id;
107         int do_al_complete_io;
108
109         /* after we moved peer_req to done_ee,
110          * we may no longer access it,
111          * it may be freed/reused already!
112          * (as soon as we release the req_lock) */
113         i = peer_req->i;
114         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
115         block_id = peer_req->block_id;
116
117         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
118         mdev->writ_cnt += peer_req->i.size >> 9;
119         list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
120         list_add_tail(&peer_req->w.list, &mdev->done_ee);
121
122         /*
123          * Do not remove from the write_requests tree here: we did not send the
124          * Ack yet and did not wake possibly waiting conflicting requests.
125          * Removed from the tree from "drbd_process_done_ee" within the
126          * appropriate w.cb (e_end_block/e_end_resync_block) or from
127          * _drbd_clear_done_ee.
128          */
129
130         do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
131
132         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
133                 __drbd_chk_io_error(mdev, false);
134         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
135
136         if (block_id == ID_SYNCER)
137                 drbd_rs_complete_io(mdev, i.sector);
138
139         if (do_wake)
140                 wake_up(&mdev->ee_wait);
141
142         if (do_al_complete_io)
143                 drbd_al_complete_io(mdev, &i);
144
145         wake_asender(mdev->tconn);
146         put_ldev(mdev);
147 }
148
149 /* writes on behalf of the partner, or resync writes,
150  * "submitted" by the receiver.
151  */
152 void drbd_peer_request_endio(struct bio *bio, int error)
153 {
154         struct drbd_peer_request *peer_req = bio->bi_private;
155         struct drbd_conf *mdev = peer_req->w.mdev;
156         int uptodate = bio_flagged(bio, BIO_UPTODATE);
157         int is_write = bio_data_dir(bio) == WRITE;
158
159         if (error && __ratelimit(&drbd_ratelimit_state))
160                 dev_warn(DEV, "%s: error=%d s=%llus\n",
161                                 is_write ? "write" : "read", error,
162                                 (unsigned long long)peer_req->i.sector);
163         if (!error && !uptodate) {
164                 if (__ratelimit(&drbd_ratelimit_state))
165                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
166                                         is_write ? "write" : "read",
167                                         (unsigned long long)peer_req->i.sector);
168                 /* strange behavior of some lower level drivers...
169                  * fail the request by clearing the uptodate flag,
170                  * but do not return any error?! */
171                 error = -EIO;
172         }
173
174         if (error)
175                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
176
177         bio_put(bio); /* no need for the bio anymore */
178         if (atomic_dec_and_test(&peer_req->pending_bios)) {
179                 if (is_write)
180                         drbd_endio_write_sec_final(peer_req);
181                 else
182                         drbd_endio_read_sec_final(peer_req);
183         }
184 }
185
186 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
187  */
188 void drbd_request_endio(struct bio *bio, int error)
189 {
190         unsigned long flags;
191         struct drbd_request *req = bio->bi_private;
192         struct drbd_conf *mdev = req->w.mdev;
193         struct bio_and_error m;
194         enum drbd_req_event what;
195         int uptodate = bio_flagged(bio, BIO_UPTODATE);
196
197         if (!error && !uptodate) {
198                 dev_warn(DEV, "p %s: setting error to -EIO\n",
199                          bio_data_dir(bio) == WRITE ? "write" : "read");
200                 /* strange behavior of some lower level drivers...
201                  * fail the request by clearing the uptodate flag,
202                  * but do not return any error?! */
203                 error = -EIO;
204         }
205
206         /* to avoid recursion in __req_mod */
207         if (unlikely(error)) {
208                 what = (bio_data_dir(bio) == WRITE)
209                         ? WRITE_COMPLETED_WITH_ERROR
210                         : (bio_rw(bio) == READ)
211                           ? READ_COMPLETED_WITH_ERROR
212                           : READ_AHEAD_COMPLETED_WITH_ERROR;
213         } else
214                 what = COMPLETED_OK;
215
216         bio_put(req->private_bio);
217         req->private_bio = ERR_PTR(error);
218
219         /* not req_mod(), we need irqsave here! */
220         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
221         __req_mod(req, what, &m);
222         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
223
224         if (m.bio)
225                 complete_master_bio(mdev, &m);
226 }
227
228 int w_read_retry_remote(struct drbd_work *w, int cancel)
229 {
230         struct drbd_request *req = container_of(w, struct drbd_request, w);
231         struct drbd_conf *mdev = w->mdev;
232
233         /* We should not detach for read io-error,
234          * but try to WRITE the P_DATA_REPLY to the failed location,
235          * to give the disk the chance to relocate that block */
236
237         spin_lock_irq(&mdev->tconn->req_lock);
238         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
239                 _req_mod(req, READ_RETRY_REMOTE_CANCELED);
240                 spin_unlock_irq(&mdev->tconn->req_lock);
241                 return 0;
242         }
243         spin_unlock_irq(&mdev->tconn->req_lock);
244
245         return w_send_read_req(w, 0);
246 }
247
248 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
249                   struct drbd_peer_request *peer_req, void *digest)
250 {
251         struct hash_desc desc;
252         struct scatterlist sg;
253         struct page *page = peer_req->pages;
254         struct page *tmp;
255         unsigned len;
256
257         desc.tfm = tfm;
258         desc.flags = 0;
259
260         sg_init_table(&sg, 1);
261         crypto_hash_init(&desc);
262
263         while ((tmp = page_chain_next(page))) {
264                 /* all but the last page will be fully used */
265                 sg_set_page(&sg, page, PAGE_SIZE, 0);
266                 crypto_hash_update(&desc, &sg, sg.length);
267                 page = tmp;
268         }
269         /* and now the last, possibly only partially used page */
270         len = peer_req->i.size & (PAGE_SIZE - 1);
271         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
272         crypto_hash_update(&desc, &sg, sg.length);
273         crypto_hash_final(&desc, digest);
274 }
275
276 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
277 {
278         struct hash_desc desc;
279         struct scatterlist sg;
280         struct bio_vec *bvec;
281         int i;
282
283         desc.tfm = tfm;
284         desc.flags = 0;
285
286         sg_init_table(&sg, 1);
287         crypto_hash_init(&desc);
288
289         __bio_for_each_segment(bvec, bio, i, 0) {
290                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
291                 crypto_hash_update(&desc, &sg, sg.length);
292         }
293         crypto_hash_final(&desc, digest);
294 }
295
296 /* MAYBE merge common code with w_e_end_ov_req */
297 static int w_e_send_csum(struct drbd_work *w, int cancel)
298 {
299         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
300         struct drbd_conf *mdev = w->mdev;
301         int digest_size;
302         void *digest;
303         int err = 0;
304
305         if (unlikely(cancel))
306                 goto out;
307
308         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
309                 goto out;
310
311         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
312         digest = kmalloc(digest_size, GFP_NOIO);
313         if (digest) {
314                 sector_t sector = peer_req->i.sector;
315                 unsigned int size = peer_req->i.size;
316                 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
317                 /* Free peer_req and pages before send.
318                  * In case we block on congestion, we could otherwise run into
319                  * some distributed deadlock, if the other side blocks on
320                  * congestion as well, because our receiver blocks in
321                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
322                 drbd_free_peer_req(mdev, peer_req);
323                 peer_req = NULL;
324                 inc_rs_pending(mdev);
325                 err = drbd_send_drequest_csum(mdev, sector, size,
326                                               digest, digest_size,
327                                               P_CSUM_RS_REQUEST);
328                 kfree(digest);
329         } else {
330                 dev_err(DEV, "kmalloc() of digest failed.\n");
331                 err = -ENOMEM;
332         }
333
334 out:
335         if (peer_req)
336                 drbd_free_peer_req(mdev, peer_req);
337
338         if (unlikely(err))
339                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
340         return err;
341 }
342
343 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
344
345 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
346 {
347         struct drbd_peer_request *peer_req;
348
349         if (!get_ldev(mdev))
350                 return -EIO;
351
352         if (drbd_rs_should_slow_down(mdev, sector))
353                 goto defer;
354
355         /* GFP_TRY, because if there is no memory available right now, this may
356          * be rescheduled for later. It is "only" background resync, after all. */
357         peer_req = drbd_alloc_peer_req(mdev, ID_SYNCER /* unused */, sector,
358                                        size, GFP_TRY);
359         if (!peer_req)
360                 goto defer;
361
362         peer_req->w.cb = w_e_send_csum;
363         spin_lock_irq(&mdev->tconn->req_lock);
364         list_add(&peer_req->w.list, &mdev->read_ee);
365         spin_unlock_irq(&mdev->tconn->req_lock);
366
367         atomic_add(size >> 9, &mdev->rs_sect_ev);
368         if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
369                 return 0;
370
371         /* If it failed because of ENOMEM, retry should help.  If it failed
372          * because bio_add_page failed (probably broken lower level driver),
373          * retry may or may not help.
374          * If it does not, you may need to force disconnect. */
375         spin_lock_irq(&mdev->tconn->req_lock);
376         list_del(&peer_req->w.list);
377         spin_unlock_irq(&mdev->tconn->req_lock);
378
379         drbd_free_peer_req(mdev, peer_req);
380 defer:
381         put_ldev(mdev);
382         return -EAGAIN;
383 }
384
385 int w_resync_timer(struct drbd_work *w, int cancel)
386 {
387         struct drbd_conf *mdev = w->mdev;
388         switch (mdev->state.conn) {
389         case C_VERIFY_S:
390                 w_make_ov_request(w, cancel);
391                 break;
392         case C_SYNC_TARGET:
393                 w_make_resync_request(w, cancel);
394                 break;
395         }
396
397         return 0;
398 }
399
400 void resync_timer_fn(unsigned long data)
401 {
402         struct drbd_conf *mdev = (struct drbd_conf *) data;
403
404         if (list_empty(&mdev->resync_work.list))
405                 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
406 }
407
408 static void fifo_set(struct fifo_buffer *fb, int value)
409 {
410         int i;
411
412         for (i = 0; i < fb->size; i++)
413                 fb->values[i] = value;
414 }
415
416 static int fifo_push(struct fifo_buffer *fb, int value)
417 {
418         int ov;
419
420         ov = fb->values[fb->head_index];
421         fb->values[fb->head_index++] = value;
422
423         if (fb->head_index >= fb->size)
424                 fb->head_index = 0;
425
426         return ov;
427 }
428
429 static void fifo_add_val(struct fifo_buffer *fb, int value)
430 {
431         int i;
432
433         for (i = 0; i < fb->size; i++)
434                 fb->values[i] += value;
435 }
436
437 static int drbd_rs_controller(struct drbd_conf *mdev)
438 {
439         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
440         unsigned int want;     /* The number of sectors we want in the proxy */
441         int req_sect; /* Number of sectors to request in this turn */
442         int correction; /* Number of sectors more we need in the proxy*/
443         int cps; /* correction per invocation of drbd_rs_controller() */
444         int steps; /* Number of time steps to plan ahead */
445         int curr_corr;
446         int max_sect;
447
448         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
449         mdev->rs_in_flight -= sect_in;
450
451         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
452
453         steps = mdev->rs_plan_s.size; /* (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
454
455         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
456                 want = ((mdev->ldev->dc.resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
457         } else { /* normal path */
458                 want = mdev->ldev->dc.c_fill_target ? mdev->ldev->dc.c_fill_target :
459                         sect_in * mdev->ldev->dc.c_delay_target * HZ / (SLEEP_TIME * 10);
460         }
461
462         correction = want - mdev->rs_in_flight - mdev->rs_planed;
463
464         /* Plan ahead */
465         cps = correction / steps;
466         fifo_add_val(&mdev->rs_plan_s, cps);
467         mdev->rs_planed += cps * steps;
468
469         /* What we do in this step */
470         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
471         spin_unlock(&mdev->peer_seq_lock);
472         mdev->rs_planed -= curr_corr;
473
474         req_sect = sect_in + curr_corr;
475         if (req_sect < 0)
476                 req_sect = 0;
477
478         max_sect = (mdev->ldev->dc.c_max_rate * 2 * SLEEP_TIME) / HZ;
479         if (req_sect > max_sect)
480                 req_sect = max_sect;
481
482         /*
483         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
484                  sect_in, mdev->rs_in_flight, want, correction,
485                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
486         */
487
488         return req_sect;
489 }
490
491 static int drbd_rs_number_requests(struct drbd_conf *mdev)
492 {
493         int number;
494         if (mdev->rs_plan_s.size) { /* mdev->ldev->dc.c_plan_ahead */
495                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
496                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
497         } else {
498                 mdev->c_sync_rate = mdev->ldev->dc.resync_rate;
499                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
500         }
501
502         /* ignore the amount of pending requests, the resync controller should
503          * throttle down to incoming reply rate soon enough anyways. */
504         return number;
505 }
506
507 int w_make_resync_request(struct drbd_work *w, int cancel)
508 {
509         struct drbd_conf *mdev = w->mdev;
510         unsigned long bit;
511         sector_t sector;
512         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
513         int max_bio_size;
514         int number, rollback_i, size;
515         int align, queued, sndbuf;
516         int i = 0;
517
518         if (unlikely(cancel))
519                 return 0;
520
521         if (mdev->rs_total == 0) {
522                 /* empty resync? */
523                 drbd_resync_finished(mdev);
524                 return 0;
525         }
526
527         if (!get_ldev(mdev)) {
528                 /* Since we only need to access mdev->rsync a
529                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
530                    to continue resync with a broken disk makes no sense at
531                    all */
532                 dev_err(DEV, "Disk broke down during resync!\n");
533                 return 0;
534         }
535
536         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
537         number = drbd_rs_number_requests(mdev);
538         if (number == 0)
539                 goto requeue;
540
541         for (i = 0; i < number; i++) {
542                 /* Stop generating RS requests, when half of the send buffer is filled */
543                 mutex_lock(&mdev->tconn->data.mutex);
544                 if (mdev->tconn->data.socket) {
545                         queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
546                         sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
547                 } else {
548                         queued = 1;
549                         sndbuf = 0;
550                 }
551                 mutex_unlock(&mdev->tconn->data.mutex);
552                 if (queued > sndbuf / 2)
553                         goto requeue;
554
555 next_sector:
556                 size = BM_BLOCK_SIZE;
557                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
558
559                 if (bit == DRBD_END_OF_BITMAP) {
560                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
561                         put_ldev(mdev);
562                         return 0;
563                 }
564
565                 sector = BM_BIT_TO_SECT(bit);
566
567                 if (drbd_rs_should_slow_down(mdev, sector) ||
568                     drbd_try_rs_begin_io(mdev, sector)) {
569                         mdev->bm_resync_fo = bit;
570                         goto requeue;
571                 }
572                 mdev->bm_resync_fo = bit + 1;
573
574                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
575                         drbd_rs_complete_io(mdev, sector);
576                         goto next_sector;
577                 }
578
579 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
580                 /* try to find some adjacent bits.
581                  * we stop if we have already the maximum req size.
582                  *
583                  * Additionally always align bigger requests, in order to
584                  * be prepared for all stripe sizes of software RAIDs.
585                  */
586                 align = 1;
587                 rollback_i = i;
588                 for (;;) {
589                         if (size + BM_BLOCK_SIZE > max_bio_size)
590                                 break;
591
592                         /* Be always aligned */
593                         if (sector & ((1<<(align+3))-1))
594                                 break;
595
596                         /* do not cross extent boundaries */
597                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
598                                 break;
599                         /* now, is it actually dirty, after all?
600                          * caution, drbd_bm_test_bit is tri-state for some
601                          * obscure reason; ( b == 0 ) would get the out-of-band
602                          * only accidentally right because of the "oddly sized"
603                          * adjustment below */
604                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
605                                 break;
606                         bit++;
607                         size += BM_BLOCK_SIZE;
608                         if ((BM_BLOCK_SIZE << align) <= size)
609                                 align++;
610                         i++;
611                 }
612                 /* if we merged some,
613                  * reset the offset to start the next drbd_bm_find_next from */
614                 if (size > BM_BLOCK_SIZE)
615                         mdev->bm_resync_fo = bit + 1;
616 #endif
617
618                 /* adjust very last sectors, in case we are oddly sized */
619                 if (sector + (size>>9) > capacity)
620                         size = (capacity-sector)<<9;
621                 if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
622                         switch (read_for_csum(mdev, sector, size)) {
623                         case -EIO: /* Disk failure */
624                                 put_ldev(mdev);
625                                 return -EIO;
626                         case -EAGAIN: /* allocation failed, or ldev busy */
627                                 drbd_rs_complete_io(mdev, sector);
628                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
629                                 i = rollback_i;
630                                 goto requeue;
631                         case 0:
632                                 /* everything ok */
633                                 break;
634                         default:
635                                 BUG();
636                         }
637                 } else {
638                         int err;
639
640                         inc_rs_pending(mdev);
641                         err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
642                                                  sector, size, ID_SYNCER);
643                         if (err) {
644                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
645                                 dec_rs_pending(mdev);
646                                 put_ldev(mdev);
647                                 return err;
648                         }
649                 }
650         }
651
652         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
653                 /* last syncer _request_ was sent,
654                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
655                  * next sync group will resume), as soon as we receive the last
656                  * resync data block, and the last bit is cleared.
657                  * until then resync "work" is "inactive" ...
658                  */
659                 put_ldev(mdev);
660                 return 0;
661         }
662
663  requeue:
664         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
665         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
666         put_ldev(mdev);
667         return 0;
668 }
669
670 static int w_make_ov_request(struct drbd_work *w, int cancel)
671 {
672         struct drbd_conf *mdev = w->mdev;
673         int number, i, size;
674         sector_t sector;
675         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
676
677         if (unlikely(cancel))
678                 return 1;
679
680         number = drbd_rs_number_requests(mdev);
681
682         sector = mdev->ov_position;
683         for (i = 0; i < number; i++) {
684                 if (sector >= capacity) {
685                         return 1;
686                 }
687
688                 size = BM_BLOCK_SIZE;
689
690                 if (drbd_rs_should_slow_down(mdev, sector) ||
691                     drbd_try_rs_begin_io(mdev, sector)) {
692                         mdev->ov_position = sector;
693                         goto requeue;
694                 }
695
696                 if (sector + (size>>9) > capacity)
697                         size = (capacity-sector)<<9;
698
699                 inc_rs_pending(mdev);
700                 if (drbd_send_ov_request(mdev, sector, size)) {
701                         dec_rs_pending(mdev);
702                         return 0;
703                 }
704                 sector += BM_SECT_PER_BIT;
705         }
706         mdev->ov_position = sector;
707
708  requeue:
709         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
710         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
711         return 1;
712 }
713
714 int w_ov_finished(struct drbd_work *w, int cancel)
715 {
716         struct drbd_conf *mdev = w->mdev;
717         kfree(w);
718         ov_out_of_sync_print(mdev);
719         drbd_resync_finished(mdev);
720
721         return 0;
722 }
723
724 static int w_resync_finished(struct drbd_work *w, int cancel)
725 {
726         struct drbd_conf *mdev = w->mdev;
727         kfree(w);
728
729         drbd_resync_finished(mdev);
730
731         return 0;
732 }
733
734 static void ping_peer(struct drbd_conf *mdev)
735 {
736         struct drbd_tconn *tconn = mdev->tconn;
737
738         clear_bit(GOT_PING_ACK, &tconn->flags);
739         request_ping(tconn);
740         wait_event(tconn->ping_wait,
741                    test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
742 }
743
744 int drbd_resync_finished(struct drbd_conf *mdev)
745 {
746         unsigned long db, dt, dbdt;
747         unsigned long n_oos;
748         union drbd_state os, ns;
749         struct drbd_work *w;
750         char *khelper_cmd = NULL;
751         int verify_done = 0;
752
753         /* Remove all elements from the resync LRU. Since future actions
754          * might set bits in the (main) bitmap, then the entries in the
755          * resync LRU would be wrong. */
756         if (drbd_rs_del_all(mdev)) {
757                 /* In case this is not possible now, most probably because
758                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
759                  * queue (or even the read operations for those packets
760                  * is not finished by now).   Retry in 100ms. */
761
762                 schedule_timeout_interruptible(HZ / 10);
763                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
764                 if (w) {
765                         w->cb = w_resync_finished;
766                         drbd_queue_work(&mdev->tconn->data.work, w);
767                         return 1;
768                 }
769                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
770         }
771
772         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
773         if (dt <= 0)
774                 dt = 1;
775         db = mdev->rs_total;
776         dbdt = Bit2KB(db/dt);
777         mdev->rs_paused /= HZ;
778
779         if (!get_ldev(mdev))
780                 goto out;
781
782         ping_peer(mdev);
783
784         spin_lock_irq(&mdev->tconn->req_lock);
785         os = drbd_read_state(mdev);
786
787         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
788
789         /* This protects us against multiple calls (that can happen in the presence
790            of application IO), and against connectivity loss just before we arrive here. */
791         if (os.conn <= C_CONNECTED)
792                 goto out_unlock;
793
794         ns = os;
795         ns.conn = C_CONNECTED;
796
797         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
798              verify_done ? "Online verify " : "Resync",
799              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
800
801         n_oos = drbd_bm_total_weight(mdev);
802
803         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
804                 if (n_oos) {
805                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
806                               n_oos, Bit2KB(1));
807                         khelper_cmd = "out-of-sync";
808                 }
809         } else {
810                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
811
812                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
813                         khelper_cmd = "after-resync-target";
814
815                 if (mdev->tconn->csums_tfm && mdev->rs_total) {
816                         const unsigned long s = mdev->rs_same_csum;
817                         const unsigned long t = mdev->rs_total;
818                         const int ratio =
819                                 (t == 0)     ? 0 :
820                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
821                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
822                              "transferred %luK total %luK\n",
823                              ratio,
824                              Bit2KB(mdev->rs_same_csum),
825                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
826                              Bit2KB(mdev->rs_total));
827                 }
828         }
829
830         if (mdev->rs_failed) {
831                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
832
833                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
834                         ns.disk = D_INCONSISTENT;
835                         ns.pdsk = D_UP_TO_DATE;
836                 } else {
837                         ns.disk = D_UP_TO_DATE;
838                         ns.pdsk = D_INCONSISTENT;
839                 }
840         } else {
841                 ns.disk = D_UP_TO_DATE;
842                 ns.pdsk = D_UP_TO_DATE;
843
844                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
845                         if (mdev->p_uuid) {
846                                 int i;
847                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
848                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
849                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
850                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
851                         } else {
852                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
853                         }
854                 }
855
856                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
857                         /* for verify runs, we don't update uuids here,
858                          * so there would be nothing to report. */
859                         drbd_uuid_set_bm(mdev, 0UL);
860                         drbd_print_uuids(mdev, "updated UUIDs");
861                         if (mdev->p_uuid) {
862                                 /* Now the two UUID sets are equal, update what we
863                                  * know of the peer. */
864                                 int i;
865                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
866                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
867                         }
868                 }
869         }
870
871         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
872 out_unlock:
873         spin_unlock_irq(&mdev->tconn->req_lock);
874         put_ldev(mdev);
875 out:
876         mdev->rs_total  = 0;
877         mdev->rs_failed = 0;
878         mdev->rs_paused = 0;
879         if (verify_done)
880                 mdev->ov_start_sector = 0;
881
882         drbd_md_sync(mdev);
883
884         if (khelper_cmd)
885                 drbd_khelper(mdev, khelper_cmd);
886
887         return 1;
888 }
889
890 /* helper */
891 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
892 {
893         if (drbd_peer_req_has_active_page(peer_req)) {
894                 /* This might happen if sendpage() has not finished */
895                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
896                 atomic_add(i, &mdev->pp_in_use_by_net);
897                 atomic_sub(i, &mdev->pp_in_use);
898                 spin_lock_irq(&mdev->tconn->req_lock);
899                 list_add_tail(&peer_req->w.list, &mdev->net_ee);
900                 spin_unlock_irq(&mdev->tconn->req_lock);
901                 wake_up(&drbd_pp_wait);
902         } else
903                 drbd_free_peer_req(mdev, peer_req);
904 }
905
906 /**
907  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
908  * @mdev:       DRBD device.
909  * @w:          work object.
910  * @cancel:     The connection will be closed anyways
911  */
912 int w_e_end_data_req(struct drbd_work *w, int cancel)
913 {
914         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
915         struct drbd_conf *mdev = w->mdev;
916         int err;
917
918         if (unlikely(cancel)) {
919                 drbd_free_peer_req(mdev, peer_req);
920                 dec_unacked(mdev);
921                 return 0;
922         }
923
924         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
925                 err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
926         } else {
927                 if (__ratelimit(&drbd_ratelimit_state))
928                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
929                             (unsigned long long)peer_req->i.sector);
930
931                 err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
932         }
933
934         dec_unacked(mdev);
935
936         move_to_net_ee_or_free(mdev, peer_req);
937
938         if (unlikely(err))
939                 dev_err(DEV, "drbd_send_block() failed\n");
940         return err;
941 }
942
943 /**
944  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
945  * @mdev:       DRBD device.
946  * @w:          work object.
947  * @cancel:     The connection will be closed anyways
948  */
949 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
950 {
951         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
952         struct drbd_conf *mdev = w->mdev;
953         int err;
954
955         if (unlikely(cancel)) {
956                 drbd_free_peer_req(mdev, peer_req);
957                 dec_unacked(mdev);
958                 return 0;
959         }
960
961         if (get_ldev_if_state(mdev, D_FAILED)) {
962                 drbd_rs_complete_io(mdev, peer_req->i.sector);
963                 put_ldev(mdev);
964         }
965
966         if (mdev->state.conn == C_AHEAD) {
967                 err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
968         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
969                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
970                         inc_rs_pending(mdev);
971                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
972                 } else {
973                         if (__ratelimit(&drbd_ratelimit_state))
974                                 dev_err(DEV, "Not sending RSDataReply, "
975                                     "partner DISKLESS!\n");
976                         err = 0;
977                 }
978         } else {
979                 if (__ratelimit(&drbd_ratelimit_state))
980                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
981                             (unsigned long long)peer_req->i.sector);
982
983                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
984
985                 /* update resync data with failure */
986                 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
987         }
988
989         dec_unacked(mdev);
990
991         move_to_net_ee_or_free(mdev, peer_req);
992
993         if (unlikely(err))
994                 dev_err(DEV, "drbd_send_block() failed\n");
995         return err;
996 }
997
998 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
999 {
1000         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1001         struct drbd_conf *mdev = w->mdev;
1002         struct digest_info *di;
1003         int digest_size;
1004         void *digest = NULL;
1005         int err, eq = 0;
1006
1007         if (unlikely(cancel)) {
1008                 drbd_free_peer_req(mdev, peer_req);
1009                 dec_unacked(mdev);
1010                 return 0;
1011         }
1012
1013         if (get_ldev(mdev)) {
1014                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1015                 put_ldev(mdev);
1016         }
1017
1018         di = peer_req->digest;
1019
1020         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1021                 /* quick hack to try to avoid a race against reconfiguration.
1022                  * a real fix would be much more involved,
1023                  * introducing more locking mechanisms */
1024                 if (mdev->tconn->csums_tfm) {
1025                         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1026                         D_ASSERT(digest_size == di->digest_size);
1027                         digest = kmalloc(digest_size, GFP_NOIO);
1028                 }
1029                 if (digest) {
1030                         drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1031                         eq = !memcmp(digest, di->digest, digest_size);
1032                         kfree(digest);
1033                 }
1034
1035                 if (eq) {
1036                         drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1037                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1038                         mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1039                         err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1040                 } else {
1041                         inc_rs_pending(mdev);
1042                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1043                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1044                         kfree(di);
1045                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1046                 }
1047         } else {
1048                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1049                 if (__ratelimit(&drbd_ratelimit_state))
1050                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1051         }
1052
1053         dec_unacked(mdev);
1054         move_to_net_ee_or_free(mdev, peer_req);
1055
1056         if (unlikely(err))
1057                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1058         return err;
1059 }
1060
1061 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1062 {
1063         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1064         struct drbd_conf *mdev = w->mdev;
1065         sector_t sector = peer_req->i.sector;
1066         unsigned int size = peer_req->i.size;
1067         int digest_size;
1068         void *digest;
1069         int err = 0;
1070
1071         if (unlikely(cancel))
1072                 goto out;
1073
1074         digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1075         digest = kmalloc(digest_size, GFP_NOIO);
1076         if (!digest) {
1077                 err = 1;        /* terminate the connection in case the allocation failed */
1078                 goto out;
1079         }
1080
1081         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1082                 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1083         else
1084                 memset(digest, 0, digest_size);
1085
1086         /* Free e and pages before send.
1087          * In case we block on congestion, we could otherwise run into
1088          * some distributed deadlock, if the other side blocks on
1089          * congestion as well, because our receiver blocks in
1090          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1091         drbd_free_peer_req(mdev, peer_req);
1092         peer_req = NULL;
1093         inc_rs_pending(mdev);
1094         err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1095         if (err)
1096                 dec_rs_pending(mdev);
1097         kfree(digest);
1098
1099 out:
1100         if (peer_req)
1101                 drbd_free_peer_req(mdev, peer_req);
1102         dec_unacked(mdev);
1103         return err;
1104 }
1105
1106 void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size)
1107 {
1108         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1109                 mdev->ov_last_oos_size += size>>9;
1110         } else {
1111                 mdev->ov_last_oos_start = sector;
1112                 mdev->ov_last_oos_size = size>>9;
1113         }
1114         drbd_set_out_of_sync(mdev, sector, size);
1115 }
1116
1117 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1118 {
1119         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1120         struct drbd_conf *mdev = w->mdev;
1121         struct digest_info *di;
1122         void *digest;
1123         sector_t sector = peer_req->i.sector;
1124         unsigned int size = peer_req->i.size;
1125         int digest_size;
1126         int err, eq = 0;
1127
1128         if (unlikely(cancel)) {
1129                 drbd_free_peer_req(mdev, peer_req);
1130                 dec_unacked(mdev);
1131                 return 0;
1132         }
1133
1134         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1135          * the resync lru has been cleaned up already */
1136         if (get_ldev(mdev)) {
1137                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1138                 put_ldev(mdev);
1139         }
1140
1141         di = peer_req->digest;
1142
1143         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1144                 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1145                 digest = kmalloc(digest_size, GFP_NOIO);
1146                 if (digest) {
1147                         drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1148
1149                         D_ASSERT(digest_size == di->digest_size);
1150                         eq = !memcmp(digest, di->digest, digest_size);
1151                         kfree(digest);
1152                 }
1153         }
1154
1155         /* Free peer_req and pages before send.
1156          * In case we block on congestion, we could otherwise run into
1157          * some distributed deadlock, if the other side blocks on
1158          * congestion as well, because our receiver blocks in
1159          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1160         drbd_free_peer_req(mdev, peer_req);
1161         if (!eq)
1162                 drbd_ov_out_of_sync_found(mdev, sector, size);
1163         else
1164                 ov_out_of_sync_print(mdev);
1165
1166         err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1167                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1168
1169         dec_unacked(mdev);
1170
1171         --mdev->ov_left;
1172
1173         /* let's advance progress step marks only for every other megabyte */
1174         if ((mdev->ov_left & 0x200) == 0x200)
1175                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1176
1177         if (mdev->ov_left == 0) {
1178                 ov_out_of_sync_print(mdev);
1179                 drbd_resync_finished(mdev);
1180         }
1181
1182         return err;
1183 }
1184
1185 int w_prev_work_done(struct drbd_work *w, int cancel)
1186 {
1187         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1188
1189         complete(&b->done);
1190         return 0;
1191 }
1192
1193 int w_send_barrier(struct drbd_work *w, int cancel)
1194 {
1195         struct drbd_socket *sock;
1196         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1197         struct drbd_conf *mdev = w->mdev;
1198         struct p_barrier *p;
1199
1200         /* really avoid racing with tl_clear.  w.cb may have been referenced
1201          * just before it was reassigned and re-queued, so double check that.
1202          * actually, this race was harmless, since we only try to send the
1203          * barrier packet here, and otherwise do nothing with the object.
1204          * but compare with the head of w_clear_epoch */
1205         spin_lock_irq(&mdev->tconn->req_lock);
1206         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1207                 cancel = 1;
1208         spin_unlock_irq(&mdev->tconn->req_lock);
1209         if (cancel)
1210                 return 0;
1211
1212         sock = &mdev->tconn->data;
1213         p = drbd_prepare_command(mdev, sock);
1214         if (!p)
1215                 return -EIO;
1216         p->barrier = b->br_number;
1217         /* inc_ap_pending was done where this was queued.
1218          * dec_ap_pending will be done in got_BarrierAck
1219          * or (on connection loss) in w_clear_epoch.  */
1220         return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0);
1221 }
1222
1223 int w_send_write_hint(struct drbd_work *w, int cancel)
1224 {
1225         struct drbd_conf *mdev = w->mdev;
1226         struct drbd_socket *sock;
1227
1228         if (cancel)
1229                 return 0;
1230         sock = &mdev->tconn->data;
1231         if (!drbd_prepare_command(mdev, sock))
1232                 return -EIO;
1233         return drbd_send_command(mdev, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1234 }
1235
1236 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1237 {
1238         struct drbd_request *req = container_of(w, struct drbd_request, w);
1239         struct drbd_conf *mdev = w->mdev;
1240         int err;
1241
1242         if (unlikely(cancel)) {
1243                 req_mod(req, SEND_CANCELED);
1244                 return 0;
1245         }
1246
1247         err = drbd_send_out_of_sync(mdev, req);
1248         req_mod(req, OOS_HANDED_TO_NETWORK);
1249
1250         return err;
1251 }
1252
1253 /**
1254  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1255  * @mdev:       DRBD device.
1256  * @w:          work object.
1257  * @cancel:     The connection will be closed anyways
1258  */
1259 int w_send_dblock(struct drbd_work *w, int cancel)
1260 {
1261         struct drbd_request *req = container_of(w, struct drbd_request, w);
1262         struct drbd_conf *mdev = w->mdev;
1263         int err;
1264
1265         if (unlikely(cancel)) {
1266                 req_mod(req, SEND_CANCELED);
1267                 return 0;
1268         }
1269
1270         err = drbd_send_dblock(mdev, req);
1271         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1272
1273         return err;
1274 }
1275
1276 /**
1277  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1278  * @mdev:       DRBD device.
1279  * @w:          work object.
1280  * @cancel:     The connection will be closed anyways
1281  */
1282 int w_send_read_req(struct drbd_work *w, int cancel)
1283 {
1284         struct drbd_request *req = container_of(w, struct drbd_request, w);
1285         struct drbd_conf *mdev = w->mdev;
1286         int err;
1287
1288         if (unlikely(cancel)) {
1289                 req_mod(req, SEND_CANCELED);
1290                 return 0;
1291         }
1292
1293         err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1294                                  (unsigned long)req);
1295
1296         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1297
1298         return err;
1299 }
1300
1301 int w_restart_disk_io(struct drbd_work *w, int cancel)
1302 {
1303         struct drbd_request *req = container_of(w, struct drbd_request, w);
1304         struct drbd_conf *mdev = w->mdev;
1305
1306         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1307                 drbd_al_begin_io(mdev, &req->i);
1308         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1309            theoretically. Practically it can not deadlock, since this is
1310            only used when unfreezing IOs. All the extents of the requests
1311            that made it into the TL are already active */
1312
1313         drbd_req_make_private_bio(req, req->master_bio);
1314         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1315         generic_make_request(req->private_bio);
1316
1317         return 0;
1318 }
1319
1320 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1321 {
1322         struct drbd_conf *odev = mdev;
1323
1324         while (1) {
1325                 if (!odev->ldev)
1326                         return 1;
1327                 if (odev->ldev->dc.resync_after == -1)
1328                         return 1;
1329                 odev = minor_to_mdev(odev->ldev->dc.resync_after);
1330                 if (!expect(odev))
1331                         return 1;
1332                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1333                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1334                     odev->state.aftr_isp || odev->state.peer_isp ||
1335                     odev->state.user_isp)
1336                         return 0;
1337         }
1338 }
1339
1340 /**
1341  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1342  * @mdev:       DRBD device.
1343  *
1344  * Called from process context only (admin command and after_state_ch).
1345  */
1346 static int _drbd_pause_after(struct drbd_conf *mdev)
1347 {
1348         struct drbd_conf *odev;
1349         int i, rv = 0;
1350
1351         rcu_read_lock();
1352         idr_for_each_entry(&minors, odev, i) {
1353                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1354                         continue;
1355                 if (!_drbd_may_sync_now(odev))
1356                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1357                                != SS_NOTHING_TO_DO);
1358         }
1359         rcu_read_unlock();
1360
1361         return rv;
1362 }
1363
1364 /**
1365  * _drbd_resume_next() - Resume resync on all devices that may resync now
1366  * @mdev:       DRBD device.
1367  *
1368  * Called from process context only (admin command and worker).
1369  */
1370 static int _drbd_resume_next(struct drbd_conf *mdev)
1371 {
1372         struct drbd_conf *odev;
1373         int i, rv = 0;
1374
1375         rcu_read_lock();
1376         idr_for_each_entry(&minors, odev, i) {
1377                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1378                         continue;
1379                 if (odev->state.aftr_isp) {
1380                         if (_drbd_may_sync_now(odev))
1381                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1382                                                         CS_HARD, NULL)
1383                                        != SS_NOTHING_TO_DO) ;
1384                 }
1385         }
1386         rcu_read_unlock();
1387         return rv;
1388 }
1389
1390 void resume_next_sg(struct drbd_conf *mdev)
1391 {
1392         write_lock_irq(&global_state_lock);
1393         _drbd_resume_next(mdev);
1394         write_unlock_irq(&global_state_lock);
1395 }
1396
1397 void suspend_other_sg(struct drbd_conf *mdev)
1398 {
1399         write_lock_irq(&global_state_lock);
1400         _drbd_pause_after(mdev);
1401         write_unlock_irq(&global_state_lock);
1402 }
1403
1404 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1405 {
1406         struct drbd_conf *odev;
1407
1408         if (o_minor == -1)
1409                 return NO_ERROR;
1410         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1411                 return ERR_SYNC_AFTER;
1412
1413         /* check for loops */
1414         odev = minor_to_mdev(o_minor);
1415         while (1) {
1416                 if (odev == mdev)
1417                         return ERR_SYNC_AFTER_CYCLE;
1418
1419                 /* dependency chain ends here, no cycles. */
1420                 if (odev->ldev->dc.resync_after == -1)
1421                         return NO_ERROR;
1422
1423                 /* follow the dependency chain */
1424                 odev = minor_to_mdev(odev->ldev->dc.resync_after);
1425         }
1426 }
1427
1428 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1429 {
1430         int changes;
1431         int retcode;
1432
1433         write_lock_irq(&global_state_lock);
1434         retcode = sync_after_error(mdev, na);
1435         if (retcode == NO_ERROR) {
1436                 mdev->ldev->dc.resync_after = na;
1437                 do {
1438                         changes  = _drbd_pause_after(mdev);
1439                         changes |= _drbd_resume_next(mdev);
1440                 } while (changes);
1441         }
1442         write_unlock_irq(&global_state_lock);
1443         return retcode;
1444 }
1445
1446 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1447 {
1448         atomic_set(&mdev->rs_sect_in, 0);
1449         atomic_set(&mdev->rs_sect_ev, 0);
1450         mdev->rs_in_flight = 0;
1451         mdev->rs_planed = 0;
1452         spin_lock(&mdev->peer_seq_lock);
1453         fifo_set(&mdev->rs_plan_s, 0);
1454         spin_unlock(&mdev->peer_seq_lock);
1455 }
1456
1457 void start_resync_timer_fn(unsigned long data)
1458 {
1459         struct drbd_conf *mdev = (struct drbd_conf *) data;
1460
1461         drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1462 }
1463
1464 int w_start_resync(struct drbd_work *w, int cancel)
1465 {
1466         struct drbd_conf *mdev = w->mdev;
1467
1468         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1469                 dev_warn(DEV, "w_start_resync later...\n");
1470                 mdev->start_resync_timer.expires = jiffies + HZ/10;
1471                 add_timer(&mdev->start_resync_timer);
1472                 return 0;
1473         }
1474
1475         drbd_start_resync(mdev, C_SYNC_SOURCE);
1476         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1477         return 0;
1478 }
1479
1480 /**
1481  * drbd_start_resync() - Start the resync process
1482  * @mdev:       DRBD device.
1483  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1484  *
1485  * This function might bring you directly into one of the
1486  * C_PAUSED_SYNC_* states.
1487  */
1488 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1489 {
1490         union drbd_state ns;
1491         int r;
1492
1493         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1494                 dev_err(DEV, "Resync already running!\n");
1495                 return;
1496         }
1497
1498         if (mdev->state.conn < C_AHEAD) {
1499                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1500                 drbd_rs_cancel_all(mdev);
1501                 /* This should be done when we abort the resync. We definitely do not
1502                    want to have this for connections going back and forth between
1503                    Ahead/Behind and SyncSource/SyncTarget */
1504         }
1505
1506         if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1507                 if (side == C_SYNC_TARGET) {
1508                         /* Since application IO was locked out during C_WF_BITMAP_T and
1509                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1510                            we check that we might make the data inconsistent. */
1511                         r = drbd_khelper(mdev, "before-resync-target");
1512                         r = (r >> 8) & 0xff;
1513                         if (r > 0) {
1514                                 dev_info(DEV, "before-resync-target handler returned %d, "
1515                                          "dropping connection.\n", r);
1516                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1517                                 return;
1518                         }
1519                 } else /* C_SYNC_SOURCE */ {
1520                         r = drbd_khelper(mdev, "before-resync-source");
1521                         r = (r >> 8) & 0xff;
1522                         if (r > 0) {
1523                                 if (r == 3) {
1524                                         dev_info(DEV, "before-resync-source handler returned %d, "
1525                                                  "ignoring. Old userland tools?", r);
1526                                 } else {
1527                                         dev_info(DEV, "before-resync-source handler returned %d, "
1528                                                  "dropping connection.\n", r);
1529                                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1530                                         return;
1531                                 }
1532                         }
1533                 }
1534         }
1535
1536         if (current == mdev->tconn->worker.task) {
1537                 /* The worker should not sleep waiting for state_mutex,
1538                    that can take long */
1539                 if (!mutex_trylock(mdev->state_mutex)) {
1540                         set_bit(B_RS_H_DONE, &mdev->flags);
1541                         mdev->start_resync_timer.expires = jiffies + HZ/5;
1542                         add_timer(&mdev->start_resync_timer);
1543                         return;
1544                 }
1545         } else {
1546                 mutex_lock(mdev->state_mutex);
1547         }
1548         clear_bit(B_RS_H_DONE, &mdev->flags);
1549
1550         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1551                 mutex_unlock(mdev->state_mutex);
1552                 return;
1553         }
1554
1555         write_lock_irq(&global_state_lock);
1556         ns = drbd_read_state(mdev);
1557
1558         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1559
1560         ns.conn = side;
1561
1562         if (side == C_SYNC_TARGET)
1563                 ns.disk = D_INCONSISTENT;
1564         else /* side == C_SYNC_SOURCE */
1565                 ns.pdsk = D_INCONSISTENT;
1566
1567         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1568         ns = drbd_read_state(mdev);
1569
1570         if (ns.conn < C_CONNECTED)
1571                 r = SS_UNKNOWN_ERROR;
1572
1573         if (r == SS_SUCCESS) {
1574                 unsigned long tw = drbd_bm_total_weight(mdev);
1575                 unsigned long now = jiffies;
1576                 int i;
1577
1578                 mdev->rs_failed    = 0;
1579                 mdev->rs_paused    = 0;
1580                 mdev->rs_same_csum = 0;
1581                 mdev->rs_last_events = 0;
1582                 mdev->rs_last_sect_ev = 0;
1583                 mdev->rs_total     = tw;
1584                 mdev->rs_start     = now;
1585                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1586                         mdev->rs_mark_left[i] = tw;
1587                         mdev->rs_mark_time[i] = now;
1588                 }
1589                 _drbd_pause_after(mdev);
1590         }
1591         write_unlock_irq(&global_state_lock);
1592
1593         if (r == SS_SUCCESS) {
1594                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1595                      drbd_conn_str(ns.conn),
1596                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1597                      (unsigned long) mdev->rs_total);
1598                 if (side == C_SYNC_TARGET)
1599                         mdev->bm_resync_fo = 0;
1600
1601                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1602                  * with w_send_oos, or the sync target will get confused as to
1603                  * how much bits to resync.  We cannot do that always, because for an
1604                  * empty resync and protocol < 95, we need to do it here, as we call
1605                  * drbd_resync_finished from here in that case.
1606                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1607                  * and from after_state_ch otherwise. */
1608                 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1609                         drbd_gen_and_send_sync_uuid(mdev);
1610
1611                 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1612                         /* This still has a race (about when exactly the peers
1613                          * detect connection loss) that can lead to a full sync
1614                          * on next handshake. In 8.3.9 we fixed this with explicit
1615                          * resync-finished notifications, but the fix
1616                          * introduces a protocol change.  Sleeping for some
1617                          * time longer than the ping interval + timeout on the
1618                          * SyncSource, to give the SyncTarget the chance to
1619                          * detect connection loss, then waiting for a ping
1620                          * response (implicit in drbd_resync_finished) reduces
1621                          * the race considerably, but does not solve it. */
1622                         if (side == C_SYNC_SOURCE) {
1623                                 struct net_conf *nc;
1624                                 int timeo;
1625
1626                                 rcu_read_lock();
1627                                 nc = rcu_dereference(mdev->tconn->net_conf);
1628                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1629                                 rcu_read_unlock();
1630                                 schedule_timeout_interruptible(timeo);
1631                         }
1632                         drbd_resync_finished(mdev);
1633                 }
1634
1635                 drbd_rs_controller_reset(mdev);
1636                 /* ns.conn may already be != mdev->state.conn,
1637                  * we may have been paused in between, or become paused until
1638                  * the timer triggers.
1639                  * No matter, that is handled in resync_timer_fn() */
1640                 if (ns.conn == C_SYNC_TARGET)
1641                         mod_timer(&mdev->resync_timer, jiffies);
1642
1643                 drbd_md_sync(mdev);
1644         }
1645         put_ldev(mdev);
1646         mutex_unlock(mdev->state_mutex);
1647 }
1648
1649 int drbd_worker(struct drbd_thread *thi)
1650 {
1651         struct drbd_tconn *tconn = thi->tconn;
1652         struct drbd_work *w = NULL;
1653         struct drbd_conf *mdev;
1654         struct net_conf *nc;
1655         LIST_HEAD(work_list);
1656         int vnr, intr = 0;
1657         int cork;
1658
1659         while (get_t_state(thi) == RUNNING) {
1660                 drbd_thread_current_set_cpu(thi);
1661
1662                 if (down_trylock(&tconn->data.work.s)) {
1663                         mutex_lock(&tconn->data.mutex);
1664
1665                         rcu_read_lock();
1666                         nc = rcu_dereference(tconn->net_conf);
1667                         cork = nc ? !nc->no_cork : 0;
1668                         rcu_read_unlock();
1669
1670                         if (tconn->data.socket && cork)
1671                                 drbd_tcp_uncork(tconn->data.socket);
1672                         mutex_unlock(&tconn->data.mutex);
1673
1674                         intr = down_interruptible(&tconn->data.work.s);
1675
1676                         mutex_lock(&tconn->data.mutex);
1677                         if (tconn->data.socket  && cork)
1678                                 drbd_tcp_cork(tconn->data.socket);
1679                         mutex_unlock(&tconn->data.mutex);
1680                 }
1681
1682                 if (intr) {
1683                         flush_signals(current);
1684                         if (get_t_state(thi) == RUNNING) {
1685                                 conn_warn(tconn, "Worker got an unexpected signal\n");
1686                                 continue;
1687                         }
1688                         break;
1689                 }
1690
1691                 if (get_t_state(thi) != RUNNING)
1692                         break;
1693                 /* With this break, we have done a down() but not consumed
1694                    the entry from the list. The cleanup code takes care of
1695                    this...   */
1696
1697                 w = NULL;
1698                 spin_lock_irq(&tconn->data.work.q_lock);
1699                 if (list_empty(&tconn->data.work.q)) {
1700                         /* something terribly wrong in our logic.
1701                          * we were able to down() the semaphore,
1702                          * but the list is empty... doh.
1703                          *
1704                          * what is the best thing to do now?
1705                          * try again from scratch, restarting the receiver,
1706                          * asender, whatnot? could break even more ugly,
1707                          * e.g. when we are primary, but no good local data.
1708                          *
1709                          * I'll try to get away just starting over this loop.
1710                          */
1711                         conn_warn(tconn, "Work list unexpectedly empty\n");
1712                         spin_unlock_irq(&tconn->data.work.q_lock);
1713                         continue;
1714                 }
1715                 w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1716                 list_del_init(&w->list);
1717                 spin_unlock_irq(&tconn->data.work.q_lock);
1718
1719                 if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1720                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1721                         if (tconn->cstate >= C_WF_REPORT_PARAMS)
1722                                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1723                 }
1724         }
1725
1726         spin_lock_irq(&tconn->data.work.q_lock);
1727         while (!list_empty(&tconn->data.work.q)) {
1728                 list_splice_init(&tconn->data.work.q, &work_list);
1729                 spin_unlock_irq(&tconn->data.work.q_lock);
1730
1731                 while (!list_empty(&work_list)) {
1732                         w = list_entry(work_list.next, struct drbd_work, list);
1733                         list_del_init(&w->list);
1734                         w->cb(w, 1);
1735                 }
1736
1737                 spin_lock_irq(&tconn->data.work.q_lock);
1738         }
1739         sema_init(&tconn->data.work.s, 0);
1740         /* DANGEROUS race: if someone did queue his work within the spinlock,
1741          * but up() ed outside the spinlock, we could get an up() on the
1742          * semaphore without corresponding list entry.
1743          * So don't do that.
1744          */
1745         spin_unlock_irq(&tconn->data.work.q_lock);
1746
1747         /* _drbd_set_state only uses stop_nowait.
1748          * wait here for the exiting receiver. */
1749         drbd_thread_stop(&tconn->receiver);
1750
1751         down_read(&drbd_cfg_rwsem);
1752         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1753                 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1754                 drbd_mdev_cleanup(mdev);
1755         }
1756         up_read(&drbd_cfg_rwsem);
1757         clear_bit(OBJECT_DYING, &tconn->flags);
1758         clear_bit(CONFIG_PENDING, &tconn->flags);
1759         wake_up(&tconn->ping_wait);
1760
1761         return 0;
1762 }