]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_worker.c
drbd: Removing drbd_cfg_rwsem
[karo-tx-linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42
43
44 /* endio handlers:
45  *   drbd_md_io_complete (defined here)
46  *   drbd_request_endio (defined here)
47  *   drbd_peer_request_endio (defined here)
48  *   bm_async_io_complete (defined in drbd_bitmap.c)
49  *
50  * For all these callbacks, note the following:
51  * The callbacks will be called in irq context by the IDE drivers,
52  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53  * Try to get the locking right :)
54  *
55  */
56
57
58 /* About the global_state_lock
59    Each state transition on an device holds a read lock. In case we have
60    to evaluate the sync after dependencies, we grab a write lock, because
61    we need stable states on all devices for that.  */
62 rwlock_t global_state_lock;
63
64 /* used for synchronous meta data and bitmap IO
65  * submitted by drbd_md_sync_page_io()
66  */
67 void drbd_md_io_complete(struct bio *bio, int error)
68 {
69         struct drbd_md_io *md_io;
70
71         md_io = (struct drbd_md_io *)bio->bi_private;
72         md_io->error = error;
73
74         complete(&md_io->event);
75 }
76
77 /* reads on behalf of the partner,
78  * "submitted" by the receiver
79  */
80 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
81 {
82         unsigned long flags = 0;
83         struct drbd_conf *mdev = peer_req->w.mdev;
84
85         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
86         mdev->read_cnt += peer_req->i.size >> 9;
87         list_del(&peer_req->w.list);
88         if (list_empty(&mdev->read_ee))
89                 wake_up(&mdev->ee_wait);
90         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
91                 __drbd_chk_io_error(mdev, false);
92         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
93
94         drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
95         put_ldev(mdev);
96 }
97
98 /* writes on behalf of the partner, or resync writes,
99  * "submitted" by the receiver, final stage.  */
100 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
101 {
102         unsigned long flags = 0;
103         struct drbd_conf *mdev = peer_req->w.mdev;
104         struct drbd_interval i;
105         int do_wake;
106         u64 block_id;
107         int do_al_complete_io;
108
109         /* after we moved peer_req to done_ee,
110          * we may no longer access it,
111          * it may be freed/reused already!
112          * (as soon as we release the req_lock) */
113         i = peer_req->i;
114         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
115         block_id = peer_req->block_id;
116
117         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
118         mdev->writ_cnt += peer_req->i.size >> 9;
119         list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
120         list_add_tail(&peer_req->w.list, &mdev->done_ee);
121
122         /*
123          * Do not remove from the write_requests tree here: we did not send the
124          * Ack yet and did not wake possibly waiting conflicting requests.
125          * Removed from the tree from "drbd_process_done_ee" within the
126          * appropriate w.cb (e_end_block/e_end_resync_block) or from
127          * _drbd_clear_done_ee.
128          */
129
130         do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
131
132         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
133                 __drbd_chk_io_error(mdev, false);
134         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
135
136         if (block_id == ID_SYNCER)
137                 drbd_rs_complete_io(mdev, i.sector);
138
139         if (do_wake)
140                 wake_up(&mdev->ee_wait);
141
142         if (do_al_complete_io)
143                 drbd_al_complete_io(mdev, &i);
144
145         wake_asender(mdev->tconn);
146         put_ldev(mdev);
147 }
148
149 /* writes on behalf of the partner, or resync writes,
150  * "submitted" by the receiver.
151  */
152 void drbd_peer_request_endio(struct bio *bio, int error)
153 {
154         struct drbd_peer_request *peer_req = bio->bi_private;
155         struct drbd_conf *mdev = peer_req->w.mdev;
156         int uptodate = bio_flagged(bio, BIO_UPTODATE);
157         int is_write = bio_data_dir(bio) == WRITE;
158
159         if (error && __ratelimit(&drbd_ratelimit_state))
160                 dev_warn(DEV, "%s: error=%d s=%llus\n",
161                                 is_write ? "write" : "read", error,
162                                 (unsigned long long)peer_req->i.sector);
163         if (!error && !uptodate) {
164                 if (__ratelimit(&drbd_ratelimit_state))
165                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
166                                         is_write ? "write" : "read",
167                                         (unsigned long long)peer_req->i.sector);
168                 /* strange behavior of some lower level drivers...
169                  * fail the request by clearing the uptodate flag,
170                  * but do not return any error?! */
171                 error = -EIO;
172         }
173
174         if (error)
175                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
176
177         bio_put(bio); /* no need for the bio anymore */
178         if (atomic_dec_and_test(&peer_req->pending_bios)) {
179                 if (is_write)
180                         drbd_endio_write_sec_final(peer_req);
181                 else
182                         drbd_endio_read_sec_final(peer_req);
183         }
184 }
185
186 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
187  */
188 void drbd_request_endio(struct bio *bio, int error)
189 {
190         unsigned long flags;
191         struct drbd_request *req = bio->bi_private;
192         struct drbd_conf *mdev = req->w.mdev;
193         struct bio_and_error m;
194         enum drbd_req_event what;
195         int uptodate = bio_flagged(bio, BIO_UPTODATE);
196
197         if (!error && !uptodate) {
198                 dev_warn(DEV, "p %s: setting error to -EIO\n",
199                          bio_data_dir(bio) == WRITE ? "write" : "read");
200                 /* strange behavior of some lower level drivers...
201                  * fail the request by clearing the uptodate flag,
202                  * but do not return any error?! */
203                 error = -EIO;
204         }
205
206         /* to avoid recursion in __req_mod */
207         if (unlikely(error)) {
208                 what = (bio_data_dir(bio) == WRITE)
209                         ? WRITE_COMPLETED_WITH_ERROR
210                         : (bio_rw(bio) == READ)
211                           ? READ_COMPLETED_WITH_ERROR
212                           : READ_AHEAD_COMPLETED_WITH_ERROR;
213         } else
214                 what = COMPLETED_OK;
215
216         bio_put(req->private_bio);
217         req->private_bio = ERR_PTR(error);
218
219         /* not req_mod(), we need irqsave here! */
220         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
221         __req_mod(req, what, &m);
222         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
223
224         if (m.bio)
225                 complete_master_bio(mdev, &m);
226 }
227
228 int w_read_retry_remote(struct drbd_work *w, int cancel)
229 {
230         struct drbd_request *req = container_of(w, struct drbd_request, w);
231         struct drbd_conf *mdev = w->mdev;
232
233         /* We should not detach for read io-error,
234          * but try to WRITE the P_DATA_REPLY to the failed location,
235          * to give the disk the chance to relocate that block */
236
237         spin_lock_irq(&mdev->tconn->req_lock);
238         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
239                 _req_mod(req, READ_RETRY_REMOTE_CANCELED);
240                 spin_unlock_irq(&mdev->tconn->req_lock);
241                 return 0;
242         }
243         spin_unlock_irq(&mdev->tconn->req_lock);
244
245         return w_send_read_req(w, 0);
246 }
247
248 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
249                   struct drbd_peer_request *peer_req, void *digest)
250 {
251         struct hash_desc desc;
252         struct scatterlist sg;
253         struct page *page = peer_req->pages;
254         struct page *tmp;
255         unsigned len;
256
257         desc.tfm = tfm;
258         desc.flags = 0;
259
260         sg_init_table(&sg, 1);
261         crypto_hash_init(&desc);
262
263         while ((tmp = page_chain_next(page))) {
264                 /* all but the last page will be fully used */
265                 sg_set_page(&sg, page, PAGE_SIZE, 0);
266                 crypto_hash_update(&desc, &sg, sg.length);
267                 page = tmp;
268         }
269         /* and now the last, possibly only partially used page */
270         len = peer_req->i.size & (PAGE_SIZE - 1);
271         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
272         crypto_hash_update(&desc, &sg, sg.length);
273         crypto_hash_final(&desc, digest);
274 }
275
276 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
277 {
278         struct hash_desc desc;
279         struct scatterlist sg;
280         struct bio_vec *bvec;
281         int i;
282
283         desc.tfm = tfm;
284         desc.flags = 0;
285
286         sg_init_table(&sg, 1);
287         crypto_hash_init(&desc);
288
289         __bio_for_each_segment(bvec, bio, i, 0) {
290                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
291                 crypto_hash_update(&desc, &sg, sg.length);
292         }
293         crypto_hash_final(&desc, digest);
294 }
295
296 /* MAYBE merge common code with w_e_end_ov_req */
297 static int w_e_send_csum(struct drbd_work *w, int cancel)
298 {
299         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
300         struct drbd_conf *mdev = w->mdev;
301         int digest_size;
302         void *digest;
303         int err = 0;
304
305         if (unlikely(cancel))
306                 goto out;
307
308         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
309                 goto out;
310
311         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
312         digest = kmalloc(digest_size, GFP_NOIO);
313         if (digest) {
314                 sector_t sector = peer_req->i.sector;
315                 unsigned int size = peer_req->i.size;
316                 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
317                 /* Free peer_req and pages before send.
318                  * In case we block on congestion, we could otherwise run into
319                  * some distributed deadlock, if the other side blocks on
320                  * congestion as well, because our receiver blocks in
321                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
322                 drbd_free_peer_req(mdev, peer_req);
323                 peer_req = NULL;
324                 inc_rs_pending(mdev);
325                 err = drbd_send_drequest_csum(mdev, sector, size,
326                                               digest, digest_size,
327                                               P_CSUM_RS_REQUEST);
328                 kfree(digest);
329         } else {
330                 dev_err(DEV, "kmalloc() of digest failed.\n");
331                 err = -ENOMEM;
332         }
333
334 out:
335         if (peer_req)
336                 drbd_free_peer_req(mdev, peer_req);
337
338         if (unlikely(err))
339                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
340         return err;
341 }
342
343 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
344
345 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
346 {
347         struct drbd_peer_request *peer_req;
348
349         if (!get_ldev(mdev))
350                 return -EIO;
351
352         if (drbd_rs_should_slow_down(mdev, sector))
353                 goto defer;
354
355         /* GFP_TRY, because if there is no memory available right now, this may
356          * be rescheduled for later. It is "only" background resync, after all. */
357         peer_req = drbd_alloc_peer_req(mdev, ID_SYNCER /* unused */, sector,
358                                        size, GFP_TRY);
359         if (!peer_req)
360                 goto defer;
361
362         peer_req->w.cb = w_e_send_csum;
363         spin_lock_irq(&mdev->tconn->req_lock);
364         list_add(&peer_req->w.list, &mdev->read_ee);
365         spin_unlock_irq(&mdev->tconn->req_lock);
366
367         atomic_add(size >> 9, &mdev->rs_sect_ev);
368         if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
369                 return 0;
370
371         /* If it failed because of ENOMEM, retry should help.  If it failed
372          * because bio_add_page failed (probably broken lower level driver),
373          * retry may or may not help.
374          * If it does not, you may need to force disconnect. */
375         spin_lock_irq(&mdev->tconn->req_lock);
376         list_del(&peer_req->w.list);
377         spin_unlock_irq(&mdev->tconn->req_lock);
378
379         drbd_free_peer_req(mdev, peer_req);
380 defer:
381         put_ldev(mdev);
382         return -EAGAIN;
383 }
384
385 int w_resync_timer(struct drbd_work *w, int cancel)
386 {
387         struct drbd_conf *mdev = w->mdev;
388         switch (mdev->state.conn) {
389         case C_VERIFY_S:
390                 w_make_ov_request(w, cancel);
391                 break;
392         case C_SYNC_TARGET:
393                 w_make_resync_request(w, cancel);
394                 break;
395         }
396
397         return 0;
398 }
399
400 void resync_timer_fn(unsigned long data)
401 {
402         struct drbd_conf *mdev = (struct drbd_conf *) data;
403
404         if (list_empty(&mdev->resync_work.list))
405                 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
406 }
407
408 static void fifo_set(struct fifo_buffer *fb, int value)
409 {
410         int i;
411
412         for (i = 0; i < fb->size; i++)
413                 fb->values[i] = value;
414 }
415
416 static int fifo_push(struct fifo_buffer *fb, int value)
417 {
418         int ov;
419
420         ov = fb->values[fb->head_index];
421         fb->values[fb->head_index++] = value;
422
423         if (fb->head_index >= fb->size)
424                 fb->head_index = 0;
425
426         return ov;
427 }
428
429 static void fifo_add_val(struct fifo_buffer *fb, int value)
430 {
431         int i;
432
433         for (i = 0; i < fb->size; i++)
434                 fb->values[i] += value;
435 }
436
437 struct fifo_buffer *fifo_alloc(int fifo_size)
438 {
439         struct fifo_buffer *fb;
440
441         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_KERNEL);
442         if (!fb)
443                 return NULL;
444
445         fb->head_index = 0;
446         fb->size = fifo_size;
447         fb->total = 0;
448
449         return fb;
450 }
451
452 static int drbd_rs_controller(struct drbd_conf *mdev)
453 {
454         struct disk_conf *dc;
455         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
456         unsigned int want;     /* The number of sectors we want in the proxy */
457         int req_sect; /* Number of sectors to request in this turn */
458         int correction; /* Number of sectors more we need in the proxy*/
459         int cps; /* correction per invocation of drbd_rs_controller() */
460         int steps; /* Number of time steps to plan ahead */
461         int curr_corr;
462         int max_sect;
463         struct fifo_buffer *plan;
464
465         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
466         mdev->rs_in_flight -= sect_in;
467
468         dc = rcu_dereference(mdev->ldev->disk_conf);
469         plan = rcu_dereference(mdev->rs_plan_s);
470
471         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
472
473         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
474                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
475         } else { /* normal path */
476                 want = dc->c_fill_target ? dc->c_fill_target :
477                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
478         }
479
480         correction = want - mdev->rs_in_flight - plan->total;
481
482         /* Plan ahead */
483         cps = correction / steps;
484         fifo_add_val(plan, cps);
485         plan->total += cps * steps;
486
487         /* What we do in this step */
488         curr_corr = fifo_push(plan, 0);
489         plan->total -= curr_corr;
490
491         req_sect = sect_in + curr_corr;
492         if (req_sect < 0)
493                 req_sect = 0;
494
495         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
496         if (req_sect > max_sect)
497                 req_sect = max_sect;
498
499         /*
500         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
501                  sect_in, mdev->rs_in_flight, want, correction,
502                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
503         */
504
505         return req_sect;
506 }
507
508 static int drbd_rs_number_requests(struct drbd_conf *mdev)
509 {
510         int number;
511
512         rcu_read_lock();
513         if (rcu_dereference(mdev->rs_plan_s)->size) {
514                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
515                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
516         } else {
517                 mdev->c_sync_rate = rcu_dereference(mdev->ldev->disk_conf)->resync_rate;
518                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
519         }
520         rcu_read_unlock();
521
522         /* ignore the amount of pending requests, the resync controller should
523          * throttle down to incoming reply rate soon enough anyways. */
524         return number;
525 }
526
527 int w_make_resync_request(struct drbd_work *w, int cancel)
528 {
529         struct drbd_conf *mdev = w->mdev;
530         unsigned long bit;
531         sector_t sector;
532         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
533         int max_bio_size;
534         int number, rollback_i, size;
535         int align, queued, sndbuf;
536         int i = 0;
537
538         if (unlikely(cancel))
539                 return 0;
540
541         if (mdev->rs_total == 0) {
542                 /* empty resync? */
543                 drbd_resync_finished(mdev);
544                 return 0;
545         }
546
547         if (!get_ldev(mdev)) {
548                 /* Since we only need to access mdev->rsync a
549                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
550                    to continue resync with a broken disk makes no sense at
551                    all */
552                 dev_err(DEV, "Disk broke down during resync!\n");
553                 return 0;
554         }
555
556         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
557         number = drbd_rs_number_requests(mdev);
558         if (number == 0)
559                 goto requeue;
560
561         for (i = 0; i < number; i++) {
562                 /* Stop generating RS requests, when half of the send buffer is filled */
563                 mutex_lock(&mdev->tconn->data.mutex);
564                 if (mdev->tconn->data.socket) {
565                         queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
566                         sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
567                 } else {
568                         queued = 1;
569                         sndbuf = 0;
570                 }
571                 mutex_unlock(&mdev->tconn->data.mutex);
572                 if (queued > sndbuf / 2)
573                         goto requeue;
574
575 next_sector:
576                 size = BM_BLOCK_SIZE;
577                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
578
579                 if (bit == DRBD_END_OF_BITMAP) {
580                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
581                         put_ldev(mdev);
582                         return 0;
583                 }
584
585                 sector = BM_BIT_TO_SECT(bit);
586
587                 if (drbd_rs_should_slow_down(mdev, sector) ||
588                     drbd_try_rs_begin_io(mdev, sector)) {
589                         mdev->bm_resync_fo = bit;
590                         goto requeue;
591                 }
592                 mdev->bm_resync_fo = bit + 1;
593
594                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
595                         drbd_rs_complete_io(mdev, sector);
596                         goto next_sector;
597                 }
598
599 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
600                 /* try to find some adjacent bits.
601                  * we stop if we have already the maximum req size.
602                  *
603                  * Additionally always align bigger requests, in order to
604                  * be prepared for all stripe sizes of software RAIDs.
605                  */
606                 align = 1;
607                 rollback_i = i;
608                 for (;;) {
609                         if (size + BM_BLOCK_SIZE > max_bio_size)
610                                 break;
611
612                         /* Be always aligned */
613                         if (sector & ((1<<(align+3))-1))
614                                 break;
615
616                         /* do not cross extent boundaries */
617                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
618                                 break;
619                         /* now, is it actually dirty, after all?
620                          * caution, drbd_bm_test_bit is tri-state for some
621                          * obscure reason; ( b == 0 ) would get the out-of-band
622                          * only accidentally right because of the "oddly sized"
623                          * adjustment below */
624                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
625                                 break;
626                         bit++;
627                         size += BM_BLOCK_SIZE;
628                         if ((BM_BLOCK_SIZE << align) <= size)
629                                 align++;
630                         i++;
631                 }
632                 /* if we merged some,
633                  * reset the offset to start the next drbd_bm_find_next from */
634                 if (size > BM_BLOCK_SIZE)
635                         mdev->bm_resync_fo = bit + 1;
636 #endif
637
638                 /* adjust very last sectors, in case we are oddly sized */
639                 if (sector + (size>>9) > capacity)
640                         size = (capacity-sector)<<9;
641                 if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
642                         switch (read_for_csum(mdev, sector, size)) {
643                         case -EIO: /* Disk failure */
644                                 put_ldev(mdev);
645                                 return -EIO;
646                         case -EAGAIN: /* allocation failed, or ldev busy */
647                                 drbd_rs_complete_io(mdev, sector);
648                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
649                                 i = rollback_i;
650                                 goto requeue;
651                         case 0:
652                                 /* everything ok */
653                                 break;
654                         default:
655                                 BUG();
656                         }
657                 } else {
658                         int err;
659
660                         inc_rs_pending(mdev);
661                         err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
662                                                  sector, size, ID_SYNCER);
663                         if (err) {
664                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
665                                 dec_rs_pending(mdev);
666                                 put_ldev(mdev);
667                                 return err;
668                         }
669                 }
670         }
671
672         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
673                 /* last syncer _request_ was sent,
674                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
675                  * next sync group will resume), as soon as we receive the last
676                  * resync data block, and the last bit is cleared.
677                  * until then resync "work" is "inactive" ...
678                  */
679                 put_ldev(mdev);
680                 return 0;
681         }
682
683  requeue:
684         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
685         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
686         put_ldev(mdev);
687         return 0;
688 }
689
690 static int w_make_ov_request(struct drbd_work *w, int cancel)
691 {
692         struct drbd_conf *mdev = w->mdev;
693         int number, i, size;
694         sector_t sector;
695         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
696
697         if (unlikely(cancel))
698                 return 1;
699
700         number = drbd_rs_number_requests(mdev);
701
702         sector = mdev->ov_position;
703         for (i = 0; i < number; i++) {
704                 if (sector >= capacity) {
705                         return 1;
706                 }
707
708                 size = BM_BLOCK_SIZE;
709
710                 if (drbd_rs_should_slow_down(mdev, sector) ||
711                     drbd_try_rs_begin_io(mdev, sector)) {
712                         mdev->ov_position = sector;
713                         goto requeue;
714                 }
715
716                 if (sector + (size>>9) > capacity)
717                         size = (capacity-sector)<<9;
718
719                 inc_rs_pending(mdev);
720                 if (drbd_send_ov_request(mdev, sector, size)) {
721                         dec_rs_pending(mdev);
722                         return 0;
723                 }
724                 sector += BM_SECT_PER_BIT;
725         }
726         mdev->ov_position = sector;
727
728  requeue:
729         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
730         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
731         return 1;
732 }
733
734 int w_ov_finished(struct drbd_work *w, int cancel)
735 {
736         struct drbd_conf *mdev = w->mdev;
737         kfree(w);
738         ov_out_of_sync_print(mdev);
739         drbd_resync_finished(mdev);
740
741         return 0;
742 }
743
744 static int w_resync_finished(struct drbd_work *w, int cancel)
745 {
746         struct drbd_conf *mdev = w->mdev;
747         kfree(w);
748
749         drbd_resync_finished(mdev);
750
751         return 0;
752 }
753
754 static void ping_peer(struct drbd_conf *mdev)
755 {
756         struct drbd_tconn *tconn = mdev->tconn;
757
758         clear_bit(GOT_PING_ACK, &tconn->flags);
759         request_ping(tconn);
760         wait_event(tconn->ping_wait,
761                    test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
762 }
763
764 int drbd_resync_finished(struct drbd_conf *mdev)
765 {
766         unsigned long db, dt, dbdt;
767         unsigned long n_oos;
768         union drbd_state os, ns;
769         struct drbd_work *w;
770         char *khelper_cmd = NULL;
771         int verify_done = 0;
772
773         /* Remove all elements from the resync LRU. Since future actions
774          * might set bits in the (main) bitmap, then the entries in the
775          * resync LRU would be wrong. */
776         if (drbd_rs_del_all(mdev)) {
777                 /* In case this is not possible now, most probably because
778                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
779                  * queue (or even the read operations for those packets
780                  * is not finished by now).   Retry in 100ms. */
781
782                 schedule_timeout_interruptible(HZ / 10);
783                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
784                 if (w) {
785                         w->cb = w_resync_finished;
786                         drbd_queue_work(&mdev->tconn->data.work, w);
787                         return 1;
788                 }
789                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
790         }
791
792         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
793         if (dt <= 0)
794                 dt = 1;
795         db = mdev->rs_total;
796         dbdt = Bit2KB(db/dt);
797         mdev->rs_paused /= HZ;
798
799         if (!get_ldev(mdev))
800                 goto out;
801
802         ping_peer(mdev);
803
804         spin_lock_irq(&mdev->tconn->req_lock);
805         os = drbd_read_state(mdev);
806
807         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
808
809         /* This protects us against multiple calls (that can happen in the presence
810            of application IO), and against connectivity loss just before we arrive here. */
811         if (os.conn <= C_CONNECTED)
812                 goto out_unlock;
813
814         ns = os;
815         ns.conn = C_CONNECTED;
816
817         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
818              verify_done ? "Online verify " : "Resync",
819              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
820
821         n_oos = drbd_bm_total_weight(mdev);
822
823         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
824                 if (n_oos) {
825                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
826                               n_oos, Bit2KB(1));
827                         khelper_cmd = "out-of-sync";
828                 }
829         } else {
830                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
831
832                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
833                         khelper_cmd = "after-resync-target";
834
835                 if (mdev->tconn->csums_tfm && mdev->rs_total) {
836                         const unsigned long s = mdev->rs_same_csum;
837                         const unsigned long t = mdev->rs_total;
838                         const int ratio =
839                                 (t == 0)     ? 0 :
840                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
841                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
842                              "transferred %luK total %luK\n",
843                              ratio,
844                              Bit2KB(mdev->rs_same_csum),
845                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
846                              Bit2KB(mdev->rs_total));
847                 }
848         }
849
850         if (mdev->rs_failed) {
851                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
852
853                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
854                         ns.disk = D_INCONSISTENT;
855                         ns.pdsk = D_UP_TO_DATE;
856                 } else {
857                         ns.disk = D_UP_TO_DATE;
858                         ns.pdsk = D_INCONSISTENT;
859                 }
860         } else {
861                 ns.disk = D_UP_TO_DATE;
862                 ns.pdsk = D_UP_TO_DATE;
863
864                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
865                         if (mdev->p_uuid) {
866                                 int i;
867                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
868                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
869                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
870                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
871                         } else {
872                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
873                         }
874                 }
875
876                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
877                         /* for verify runs, we don't update uuids here,
878                          * so there would be nothing to report. */
879                         drbd_uuid_set_bm(mdev, 0UL);
880                         drbd_print_uuids(mdev, "updated UUIDs");
881                         if (mdev->p_uuid) {
882                                 /* Now the two UUID sets are equal, update what we
883                                  * know of the peer. */
884                                 int i;
885                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
886                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
887                         }
888                 }
889         }
890
891         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
892 out_unlock:
893         spin_unlock_irq(&mdev->tconn->req_lock);
894         put_ldev(mdev);
895 out:
896         mdev->rs_total  = 0;
897         mdev->rs_failed = 0;
898         mdev->rs_paused = 0;
899         if (verify_done)
900                 mdev->ov_start_sector = 0;
901
902         drbd_md_sync(mdev);
903
904         if (khelper_cmd)
905                 drbd_khelper(mdev, khelper_cmd);
906
907         return 1;
908 }
909
910 /* helper */
911 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
912 {
913         if (drbd_peer_req_has_active_page(peer_req)) {
914                 /* This might happen if sendpage() has not finished */
915                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
916                 atomic_add(i, &mdev->pp_in_use_by_net);
917                 atomic_sub(i, &mdev->pp_in_use);
918                 spin_lock_irq(&mdev->tconn->req_lock);
919                 list_add_tail(&peer_req->w.list, &mdev->net_ee);
920                 spin_unlock_irq(&mdev->tconn->req_lock);
921                 wake_up(&drbd_pp_wait);
922         } else
923                 drbd_free_peer_req(mdev, peer_req);
924 }
925
926 /**
927  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
928  * @mdev:       DRBD device.
929  * @w:          work object.
930  * @cancel:     The connection will be closed anyways
931  */
932 int w_e_end_data_req(struct drbd_work *w, int cancel)
933 {
934         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
935         struct drbd_conf *mdev = w->mdev;
936         int err;
937
938         if (unlikely(cancel)) {
939                 drbd_free_peer_req(mdev, peer_req);
940                 dec_unacked(mdev);
941                 return 0;
942         }
943
944         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
945                 err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
946         } else {
947                 if (__ratelimit(&drbd_ratelimit_state))
948                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
949                             (unsigned long long)peer_req->i.sector);
950
951                 err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
952         }
953
954         dec_unacked(mdev);
955
956         move_to_net_ee_or_free(mdev, peer_req);
957
958         if (unlikely(err))
959                 dev_err(DEV, "drbd_send_block() failed\n");
960         return err;
961 }
962
963 /**
964  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
965  * @mdev:       DRBD device.
966  * @w:          work object.
967  * @cancel:     The connection will be closed anyways
968  */
969 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
970 {
971         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
972         struct drbd_conf *mdev = w->mdev;
973         int err;
974
975         if (unlikely(cancel)) {
976                 drbd_free_peer_req(mdev, peer_req);
977                 dec_unacked(mdev);
978                 return 0;
979         }
980
981         if (get_ldev_if_state(mdev, D_FAILED)) {
982                 drbd_rs_complete_io(mdev, peer_req->i.sector);
983                 put_ldev(mdev);
984         }
985
986         if (mdev->state.conn == C_AHEAD) {
987                 err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
988         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
989                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
990                         inc_rs_pending(mdev);
991                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
992                 } else {
993                         if (__ratelimit(&drbd_ratelimit_state))
994                                 dev_err(DEV, "Not sending RSDataReply, "
995                                     "partner DISKLESS!\n");
996                         err = 0;
997                 }
998         } else {
999                 if (__ratelimit(&drbd_ratelimit_state))
1000                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1001                             (unsigned long long)peer_req->i.sector);
1002
1003                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1004
1005                 /* update resync data with failure */
1006                 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
1007         }
1008
1009         dec_unacked(mdev);
1010
1011         move_to_net_ee_or_free(mdev, peer_req);
1012
1013         if (unlikely(err))
1014                 dev_err(DEV, "drbd_send_block() failed\n");
1015         return err;
1016 }
1017
1018 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1019 {
1020         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1021         struct drbd_conf *mdev = w->mdev;
1022         struct digest_info *di;
1023         int digest_size;
1024         void *digest = NULL;
1025         int err, eq = 0;
1026
1027         if (unlikely(cancel)) {
1028                 drbd_free_peer_req(mdev, peer_req);
1029                 dec_unacked(mdev);
1030                 return 0;
1031         }
1032
1033         if (get_ldev(mdev)) {
1034                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1035                 put_ldev(mdev);
1036         }
1037
1038         di = peer_req->digest;
1039
1040         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1041                 /* quick hack to try to avoid a race against reconfiguration.
1042                  * a real fix would be much more involved,
1043                  * introducing more locking mechanisms */
1044                 if (mdev->tconn->csums_tfm) {
1045                         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1046                         D_ASSERT(digest_size == di->digest_size);
1047                         digest = kmalloc(digest_size, GFP_NOIO);
1048                 }
1049                 if (digest) {
1050                         drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1051                         eq = !memcmp(digest, di->digest, digest_size);
1052                         kfree(digest);
1053                 }
1054
1055                 if (eq) {
1056                         drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1057                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1058                         mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1059                         err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1060                 } else {
1061                         inc_rs_pending(mdev);
1062                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1063                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1064                         kfree(di);
1065                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1066                 }
1067         } else {
1068                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1069                 if (__ratelimit(&drbd_ratelimit_state))
1070                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1071         }
1072
1073         dec_unacked(mdev);
1074         move_to_net_ee_or_free(mdev, peer_req);
1075
1076         if (unlikely(err))
1077                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1078         return err;
1079 }
1080
1081 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1082 {
1083         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1084         struct drbd_conf *mdev = w->mdev;
1085         sector_t sector = peer_req->i.sector;
1086         unsigned int size = peer_req->i.size;
1087         int digest_size;
1088         void *digest;
1089         int err = 0;
1090
1091         if (unlikely(cancel))
1092                 goto out;
1093
1094         digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1095         digest = kmalloc(digest_size, GFP_NOIO);
1096         if (!digest) {
1097                 err = 1;        /* terminate the connection in case the allocation failed */
1098                 goto out;
1099         }
1100
1101         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1102                 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1103         else
1104                 memset(digest, 0, digest_size);
1105
1106         /* Free e and pages before send.
1107          * In case we block on congestion, we could otherwise run into
1108          * some distributed deadlock, if the other side blocks on
1109          * congestion as well, because our receiver blocks in
1110          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1111         drbd_free_peer_req(mdev, peer_req);
1112         peer_req = NULL;
1113         inc_rs_pending(mdev);
1114         err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1115         if (err)
1116                 dec_rs_pending(mdev);
1117         kfree(digest);
1118
1119 out:
1120         if (peer_req)
1121                 drbd_free_peer_req(mdev, peer_req);
1122         dec_unacked(mdev);
1123         return err;
1124 }
1125
1126 void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size)
1127 {
1128         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1129                 mdev->ov_last_oos_size += size>>9;
1130         } else {
1131                 mdev->ov_last_oos_start = sector;
1132                 mdev->ov_last_oos_size = size>>9;
1133         }
1134         drbd_set_out_of_sync(mdev, sector, size);
1135 }
1136
1137 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1138 {
1139         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1140         struct drbd_conf *mdev = w->mdev;
1141         struct digest_info *di;
1142         void *digest;
1143         sector_t sector = peer_req->i.sector;
1144         unsigned int size = peer_req->i.size;
1145         int digest_size;
1146         int err, eq = 0;
1147
1148         if (unlikely(cancel)) {
1149                 drbd_free_peer_req(mdev, peer_req);
1150                 dec_unacked(mdev);
1151                 return 0;
1152         }
1153
1154         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1155          * the resync lru has been cleaned up already */
1156         if (get_ldev(mdev)) {
1157                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1158                 put_ldev(mdev);
1159         }
1160
1161         di = peer_req->digest;
1162
1163         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1164                 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1165                 digest = kmalloc(digest_size, GFP_NOIO);
1166                 if (digest) {
1167                         drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1168
1169                         D_ASSERT(digest_size == di->digest_size);
1170                         eq = !memcmp(digest, di->digest, digest_size);
1171                         kfree(digest);
1172                 }
1173         }
1174
1175         /* Free peer_req and pages before send.
1176          * In case we block on congestion, we could otherwise run into
1177          * some distributed deadlock, if the other side blocks on
1178          * congestion as well, because our receiver blocks in
1179          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1180         drbd_free_peer_req(mdev, peer_req);
1181         if (!eq)
1182                 drbd_ov_out_of_sync_found(mdev, sector, size);
1183         else
1184                 ov_out_of_sync_print(mdev);
1185
1186         err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1187                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1188
1189         dec_unacked(mdev);
1190
1191         --mdev->ov_left;
1192
1193         /* let's advance progress step marks only for every other megabyte */
1194         if ((mdev->ov_left & 0x200) == 0x200)
1195                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1196
1197         if (mdev->ov_left == 0) {
1198                 ov_out_of_sync_print(mdev);
1199                 drbd_resync_finished(mdev);
1200         }
1201
1202         return err;
1203 }
1204
1205 int w_prev_work_done(struct drbd_work *w, int cancel)
1206 {
1207         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1208
1209         complete(&b->done);
1210         return 0;
1211 }
1212
1213 int w_send_barrier(struct drbd_work *w, int cancel)
1214 {
1215         struct drbd_socket *sock;
1216         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1217         struct drbd_conf *mdev = w->mdev;
1218         struct p_barrier *p;
1219
1220         /* really avoid racing with tl_clear.  w.cb may have been referenced
1221          * just before it was reassigned and re-queued, so double check that.
1222          * actually, this race was harmless, since we only try to send the
1223          * barrier packet here, and otherwise do nothing with the object.
1224          * but compare with the head of w_clear_epoch */
1225         spin_lock_irq(&mdev->tconn->req_lock);
1226         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1227                 cancel = 1;
1228         spin_unlock_irq(&mdev->tconn->req_lock);
1229         if (cancel)
1230                 return 0;
1231
1232         sock = &mdev->tconn->data;
1233         p = drbd_prepare_command(mdev, sock);
1234         if (!p)
1235                 return -EIO;
1236         p->barrier = b->br_number;
1237         /* inc_ap_pending was done where this was queued.
1238          * dec_ap_pending will be done in got_BarrierAck
1239          * or (on connection loss) in w_clear_epoch.  */
1240         return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0);
1241 }
1242
1243 int w_send_write_hint(struct drbd_work *w, int cancel)
1244 {
1245         struct drbd_conf *mdev = w->mdev;
1246         struct drbd_socket *sock;
1247
1248         if (cancel)
1249                 return 0;
1250         sock = &mdev->tconn->data;
1251         if (!drbd_prepare_command(mdev, sock))
1252                 return -EIO;
1253         return drbd_send_command(mdev, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1254 }
1255
1256 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1257 {
1258         struct drbd_request *req = container_of(w, struct drbd_request, w);
1259         struct drbd_conf *mdev = w->mdev;
1260         int err;
1261
1262         if (unlikely(cancel)) {
1263                 req_mod(req, SEND_CANCELED);
1264                 return 0;
1265         }
1266
1267         err = drbd_send_out_of_sync(mdev, req);
1268         req_mod(req, OOS_HANDED_TO_NETWORK);
1269
1270         return err;
1271 }
1272
1273 /**
1274  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1275  * @mdev:       DRBD device.
1276  * @w:          work object.
1277  * @cancel:     The connection will be closed anyways
1278  */
1279 int w_send_dblock(struct drbd_work *w, int cancel)
1280 {
1281         struct drbd_request *req = container_of(w, struct drbd_request, w);
1282         struct drbd_conf *mdev = w->mdev;
1283         int err;
1284
1285         if (unlikely(cancel)) {
1286                 req_mod(req, SEND_CANCELED);
1287                 return 0;
1288         }
1289
1290         err = drbd_send_dblock(mdev, req);
1291         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1292
1293         return err;
1294 }
1295
1296 /**
1297  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1298  * @mdev:       DRBD device.
1299  * @w:          work object.
1300  * @cancel:     The connection will be closed anyways
1301  */
1302 int w_send_read_req(struct drbd_work *w, int cancel)
1303 {
1304         struct drbd_request *req = container_of(w, struct drbd_request, w);
1305         struct drbd_conf *mdev = w->mdev;
1306         int err;
1307
1308         if (unlikely(cancel)) {
1309                 req_mod(req, SEND_CANCELED);
1310                 return 0;
1311         }
1312
1313         err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1314                                  (unsigned long)req);
1315
1316         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1317
1318         return err;
1319 }
1320
1321 int w_restart_disk_io(struct drbd_work *w, int cancel)
1322 {
1323         struct drbd_request *req = container_of(w, struct drbd_request, w);
1324         struct drbd_conf *mdev = w->mdev;
1325
1326         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1327                 drbd_al_begin_io(mdev, &req->i);
1328         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1329            theoretically. Practically it can not deadlock, since this is
1330            only used when unfreezing IOs. All the extents of the requests
1331            that made it into the TL are already active */
1332
1333         drbd_req_make_private_bio(req, req->master_bio);
1334         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1335         generic_make_request(req->private_bio);
1336
1337         return 0;
1338 }
1339
1340 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1341 {
1342         struct drbd_conf *odev = mdev;
1343         int ra;
1344
1345         while (1) {
1346                 if (!odev->ldev)
1347                         return 1;
1348                 rcu_read_lock();
1349                 ra = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1350                 rcu_read_unlock();
1351                 if (ra == -1)
1352                         return 1;
1353                 odev = minor_to_mdev(ra);
1354                 if (!expect(odev))
1355                         return 1;
1356                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1357                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1358                     odev->state.aftr_isp || odev->state.peer_isp ||
1359                     odev->state.user_isp)
1360                         return 0;
1361         }
1362 }
1363
1364 /**
1365  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1366  * @mdev:       DRBD device.
1367  *
1368  * Called from process context only (admin command and after_state_ch).
1369  */
1370 static int _drbd_pause_after(struct drbd_conf *mdev)
1371 {
1372         struct drbd_conf *odev;
1373         int i, rv = 0;
1374
1375         rcu_read_lock();
1376         idr_for_each_entry(&minors, odev, i) {
1377                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1378                         continue;
1379                 if (!_drbd_may_sync_now(odev))
1380                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1381                                != SS_NOTHING_TO_DO);
1382         }
1383         rcu_read_unlock();
1384
1385         return rv;
1386 }
1387
1388 /**
1389  * _drbd_resume_next() - Resume resync on all devices that may resync now
1390  * @mdev:       DRBD device.
1391  *
1392  * Called from process context only (admin command and worker).
1393  */
1394 static int _drbd_resume_next(struct drbd_conf *mdev)
1395 {
1396         struct drbd_conf *odev;
1397         int i, rv = 0;
1398
1399         rcu_read_lock();
1400         idr_for_each_entry(&minors, odev, i) {
1401                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1402                         continue;
1403                 if (odev->state.aftr_isp) {
1404                         if (_drbd_may_sync_now(odev))
1405                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1406                                                         CS_HARD, NULL)
1407                                        != SS_NOTHING_TO_DO) ;
1408                 }
1409         }
1410         rcu_read_unlock();
1411         return rv;
1412 }
1413
1414 void resume_next_sg(struct drbd_conf *mdev)
1415 {
1416         write_lock_irq(&global_state_lock);
1417         _drbd_resume_next(mdev);
1418         write_unlock_irq(&global_state_lock);
1419 }
1420
1421 void suspend_other_sg(struct drbd_conf *mdev)
1422 {
1423         write_lock_irq(&global_state_lock);
1424         _drbd_pause_after(mdev);
1425         write_unlock_irq(&global_state_lock);
1426 }
1427
1428 /* caller must hold global_state_lock */
1429 enum drbd_ret_code drbd_sync_after_valid(struct drbd_conf *mdev, int o_minor)
1430 {
1431         struct drbd_conf *odev;
1432         int ra;
1433
1434         if (o_minor == -1)
1435                 return NO_ERROR;
1436         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1437                 return ERR_SYNC_AFTER;
1438
1439         /* check for loops */
1440         odev = minor_to_mdev(o_minor);
1441         while (1) {
1442                 if (odev == mdev)
1443                         return ERR_SYNC_AFTER_CYCLE;
1444
1445                 rcu_read_lock();
1446                 ra = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1447                 rcu_read_unlock();
1448                 /* dependency chain ends here, no cycles. */
1449                 if (ra == -1)
1450                         return NO_ERROR;
1451
1452                 /* follow the dependency chain */
1453                 odev = minor_to_mdev(ra);
1454         }
1455 }
1456
1457 /* caller must hold global_state_lock */
1458 void drbd_sync_after_changed(struct drbd_conf *mdev)
1459 {
1460         int changes;
1461
1462         do {
1463                 changes  = _drbd_pause_after(mdev);
1464                 changes |= _drbd_resume_next(mdev);
1465         } while (changes);
1466 }
1467
1468 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1469 {
1470         struct fifo_buffer *plan;
1471
1472         atomic_set(&mdev->rs_sect_in, 0);
1473         atomic_set(&mdev->rs_sect_ev, 0);
1474         mdev->rs_in_flight = 0;
1475
1476         /* Updating the RCU protected object in place is necessary since
1477            this function gets called from atomic context.
1478            It is valid since all other updates also lead to an completely
1479            empty fifo */
1480         rcu_read_lock();
1481         plan = rcu_dereference(mdev->rs_plan_s);
1482         plan->total = 0;
1483         fifo_set(plan, 0);
1484         rcu_read_unlock();
1485 }
1486
1487 void start_resync_timer_fn(unsigned long data)
1488 {
1489         struct drbd_conf *mdev = (struct drbd_conf *) data;
1490
1491         drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1492 }
1493
1494 int w_start_resync(struct drbd_work *w, int cancel)
1495 {
1496         struct drbd_conf *mdev = w->mdev;
1497
1498         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1499                 dev_warn(DEV, "w_start_resync later...\n");
1500                 mdev->start_resync_timer.expires = jiffies + HZ/10;
1501                 add_timer(&mdev->start_resync_timer);
1502                 return 0;
1503         }
1504
1505         drbd_start_resync(mdev, C_SYNC_SOURCE);
1506         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1507         return 0;
1508 }
1509
1510 /**
1511  * drbd_start_resync() - Start the resync process
1512  * @mdev:       DRBD device.
1513  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1514  *
1515  * This function might bring you directly into one of the
1516  * C_PAUSED_SYNC_* states.
1517  */
1518 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1519 {
1520         union drbd_state ns;
1521         int r;
1522
1523         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1524                 dev_err(DEV, "Resync already running!\n");
1525                 return;
1526         }
1527
1528         if (mdev->state.conn < C_AHEAD) {
1529                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1530                 drbd_rs_cancel_all(mdev);
1531                 /* This should be done when we abort the resync. We definitely do not
1532                    want to have this for connections going back and forth between
1533                    Ahead/Behind and SyncSource/SyncTarget */
1534         }
1535
1536         if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1537                 if (side == C_SYNC_TARGET) {
1538                         /* Since application IO was locked out during C_WF_BITMAP_T and
1539                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1540                            we check that we might make the data inconsistent. */
1541                         r = drbd_khelper(mdev, "before-resync-target");
1542                         r = (r >> 8) & 0xff;
1543                         if (r > 0) {
1544                                 dev_info(DEV, "before-resync-target handler returned %d, "
1545                                          "dropping connection.\n", r);
1546                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1547                                 return;
1548                         }
1549                 } else /* C_SYNC_SOURCE */ {
1550                         r = drbd_khelper(mdev, "before-resync-source");
1551                         r = (r >> 8) & 0xff;
1552                         if (r > 0) {
1553                                 if (r == 3) {
1554                                         dev_info(DEV, "before-resync-source handler returned %d, "
1555                                                  "ignoring. Old userland tools?", r);
1556                                 } else {
1557                                         dev_info(DEV, "before-resync-source handler returned %d, "
1558                                                  "dropping connection.\n", r);
1559                                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1560                                         return;
1561                                 }
1562                         }
1563                 }
1564         }
1565
1566         if (current == mdev->tconn->worker.task) {
1567                 /* The worker should not sleep waiting for state_mutex,
1568                    that can take long */
1569                 if (!mutex_trylock(mdev->state_mutex)) {
1570                         set_bit(B_RS_H_DONE, &mdev->flags);
1571                         mdev->start_resync_timer.expires = jiffies + HZ/5;
1572                         add_timer(&mdev->start_resync_timer);
1573                         return;
1574                 }
1575         } else {
1576                 mutex_lock(mdev->state_mutex);
1577         }
1578         clear_bit(B_RS_H_DONE, &mdev->flags);
1579
1580         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1581                 mutex_unlock(mdev->state_mutex);
1582                 return;
1583         }
1584
1585         write_lock_irq(&global_state_lock);
1586         ns = drbd_read_state(mdev);
1587
1588         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1589
1590         ns.conn = side;
1591
1592         if (side == C_SYNC_TARGET)
1593                 ns.disk = D_INCONSISTENT;
1594         else /* side == C_SYNC_SOURCE */
1595                 ns.pdsk = D_INCONSISTENT;
1596
1597         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1598         ns = drbd_read_state(mdev);
1599
1600         if (ns.conn < C_CONNECTED)
1601                 r = SS_UNKNOWN_ERROR;
1602
1603         if (r == SS_SUCCESS) {
1604                 unsigned long tw = drbd_bm_total_weight(mdev);
1605                 unsigned long now = jiffies;
1606                 int i;
1607
1608                 mdev->rs_failed    = 0;
1609                 mdev->rs_paused    = 0;
1610                 mdev->rs_same_csum = 0;
1611                 mdev->rs_last_events = 0;
1612                 mdev->rs_last_sect_ev = 0;
1613                 mdev->rs_total     = tw;
1614                 mdev->rs_start     = now;
1615                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1616                         mdev->rs_mark_left[i] = tw;
1617                         mdev->rs_mark_time[i] = now;
1618                 }
1619                 _drbd_pause_after(mdev);
1620         }
1621         write_unlock_irq(&global_state_lock);
1622
1623         if (r == SS_SUCCESS) {
1624                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1625                      drbd_conn_str(ns.conn),
1626                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1627                      (unsigned long) mdev->rs_total);
1628                 if (side == C_SYNC_TARGET)
1629                         mdev->bm_resync_fo = 0;
1630
1631                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1632                  * with w_send_oos, or the sync target will get confused as to
1633                  * how much bits to resync.  We cannot do that always, because for an
1634                  * empty resync and protocol < 95, we need to do it here, as we call
1635                  * drbd_resync_finished from here in that case.
1636                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1637                  * and from after_state_ch otherwise. */
1638                 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1639                         drbd_gen_and_send_sync_uuid(mdev);
1640
1641                 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1642                         /* This still has a race (about when exactly the peers
1643                          * detect connection loss) that can lead to a full sync
1644                          * on next handshake. In 8.3.9 we fixed this with explicit
1645                          * resync-finished notifications, but the fix
1646                          * introduces a protocol change.  Sleeping for some
1647                          * time longer than the ping interval + timeout on the
1648                          * SyncSource, to give the SyncTarget the chance to
1649                          * detect connection loss, then waiting for a ping
1650                          * response (implicit in drbd_resync_finished) reduces
1651                          * the race considerably, but does not solve it. */
1652                         if (side == C_SYNC_SOURCE) {
1653                                 struct net_conf *nc;
1654                                 int timeo;
1655
1656                                 rcu_read_lock();
1657                                 nc = rcu_dereference(mdev->tconn->net_conf);
1658                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1659                                 rcu_read_unlock();
1660                                 schedule_timeout_interruptible(timeo);
1661                         }
1662                         drbd_resync_finished(mdev);
1663                 }
1664
1665                 drbd_rs_controller_reset(mdev);
1666                 /* ns.conn may already be != mdev->state.conn,
1667                  * we may have been paused in between, or become paused until
1668                  * the timer triggers.
1669                  * No matter, that is handled in resync_timer_fn() */
1670                 if (ns.conn == C_SYNC_TARGET)
1671                         mod_timer(&mdev->resync_timer, jiffies);
1672
1673                 drbd_md_sync(mdev);
1674         }
1675         put_ldev(mdev);
1676         mutex_unlock(mdev->state_mutex);
1677 }
1678
1679 int drbd_worker(struct drbd_thread *thi)
1680 {
1681         struct drbd_tconn *tconn = thi->tconn;
1682         struct drbd_work *w = NULL;
1683         struct drbd_conf *mdev;
1684         struct net_conf *nc;
1685         LIST_HEAD(work_list);
1686         int vnr, intr = 0;
1687         int cork;
1688
1689         while (get_t_state(thi) == RUNNING) {
1690                 drbd_thread_current_set_cpu(thi);
1691
1692                 if (down_trylock(&tconn->data.work.s)) {
1693                         mutex_lock(&tconn->data.mutex);
1694
1695                         rcu_read_lock();
1696                         nc = rcu_dereference(tconn->net_conf);
1697                         cork = nc ? nc->tcp_cork : 0;
1698                         rcu_read_unlock();
1699
1700                         if (tconn->data.socket && cork)
1701                                 drbd_tcp_uncork(tconn->data.socket);
1702                         mutex_unlock(&tconn->data.mutex);
1703
1704                         intr = down_interruptible(&tconn->data.work.s);
1705
1706                         mutex_lock(&tconn->data.mutex);
1707                         if (tconn->data.socket  && cork)
1708                                 drbd_tcp_cork(tconn->data.socket);
1709                         mutex_unlock(&tconn->data.mutex);
1710                 }
1711
1712                 if (intr) {
1713                         flush_signals(current);
1714                         if (get_t_state(thi) == RUNNING) {
1715                                 conn_warn(tconn, "Worker got an unexpected signal\n");
1716                                 continue;
1717                         }
1718                         break;
1719                 }
1720
1721                 if (get_t_state(thi) != RUNNING)
1722                         break;
1723                 /* With this break, we have done a down() but not consumed
1724                    the entry from the list. The cleanup code takes care of
1725                    this...   */
1726
1727                 w = NULL;
1728                 spin_lock_irq(&tconn->data.work.q_lock);
1729                 if (list_empty(&tconn->data.work.q)) {
1730                         /* something terribly wrong in our logic.
1731                          * we were able to down() the semaphore,
1732                          * but the list is empty... doh.
1733                          *
1734                          * what is the best thing to do now?
1735                          * try again from scratch, restarting the receiver,
1736                          * asender, whatnot? could break even more ugly,
1737                          * e.g. when we are primary, but no good local data.
1738                          *
1739                          * I'll try to get away just starting over this loop.
1740                          */
1741                         conn_warn(tconn, "Work list unexpectedly empty\n");
1742                         spin_unlock_irq(&tconn->data.work.q_lock);
1743                         continue;
1744                 }
1745                 w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1746                 list_del_init(&w->list);
1747                 spin_unlock_irq(&tconn->data.work.q_lock);
1748
1749                 if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1750                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1751                         if (tconn->cstate >= C_WF_REPORT_PARAMS)
1752                                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1753                 }
1754         }
1755
1756         spin_lock_irq(&tconn->data.work.q_lock);
1757         while (!list_empty(&tconn->data.work.q)) {
1758                 list_splice_init(&tconn->data.work.q, &work_list);
1759                 spin_unlock_irq(&tconn->data.work.q_lock);
1760
1761                 while (!list_empty(&work_list)) {
1762                         w = list_entry(work_list.next, struct drbd_work, list);
1763                         list_del_init(&w->list);
1764                         w->cb(w, 1);
1765                 }
1766
1767                 spin_lock_irq(&tconn->data.work.q_lock);
1768         }
1769         sema_init(&tconn->data.work.s, 0);
1770         /* DANGEROUS race: if someone did queue his work within the spinlock,
1771          * but up() ed outside the spinlock, we could get an up() on the
1772          * semaphore without corresponding list entry.
1773          * So don't do that.
1774          */
1775         spin_unlock_irq(&tconn->data.work.q_lock);
1776
1777         rcu_read_lock();
1778         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1779                 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1780                 kref_get(&mdev->kref);
1781                 rcu_read_unlock();
1782                 drbd_mdev_cleanup(mdev);
1783                 kref_put(&mdev->kref, &drbd_minor_destroy);
1784                 rcu_read_lock();
1785         }
1786         rcu_read_unlock();
1787
1788         return 0;
1789 }