]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_worker.c
rcu: Remove expedited GP funnel-lock bypass
[karo-tx-linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44
45 /* endio handlers:
46  *   drbd_md_endio (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   drbd_bm_endio (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57
58 /* used for synchronous meta data and bitmap IO
59  * submitted by drbd_md_sync_page_io()
60  */
61 void drbd_md_endio(struct bio *bio)
62 {
63         struct drbd_device *device;
64
65         device = bio->bi_private;
66         device->md_io.error = bio->bi_error;
67
68         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
69          * to timeout on the lower level device, and eventually detach from it.
70          * If this io completion runs after that timeout expired, this
71          * drbd_md_put_buffer() may allow us to finally try and re-attach.
72          * During normal operation, this only puts that extra reference
73          * down to 1 again.
74          * Make sure we first drop the reference, and only then signal
75          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
76          * next drbd_md_sync_page_io(), that we trigger the
77          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
78          */
79         drbd_md_put_buffer(device);
80         device->md_io.done = 1;
81         wake_up(&device->misc_wait);
82         bio_put(bio);
83         if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
84                 put_ldev(device);
85 }
86
87 /* reads on behalf of the partner,
88  * "submitted" by the receiver
89  */
90 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
91 {
92         unsigned long flags = 0;
93         struct drbd_peer_device *peer_device = peer_req->peer_device;
94         struct drbd_device *device = peer_device->device;
95
96         spin_lock_irqsave(&device->resource->req_lock, flags);
97         device->read_cnt += peer_req->i.size >> 9;
98         list_del(&peer_req->w.list);
99         if (list_empty(&device->read_ee))
100                 wake_up(&device->ee_wait);
101         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
102                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
103         spin_unlock_irqrestore(&device->resource->req_lock, flags);
104
105         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
106         put_ldev(device);
107 }
108
109 /* writes on behalf of the partner, or resync writes,
110  * "submitted" by the receiver, final stage.  */
111 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
112 {
113         unsigned long flags = 0;
114         struct drbd_peer_device *peer_device = peer_req->peer_device;
115         struct drbd_device *device = peer_device->device;
116         struct drbd_connection *connection = peer_device->connection;
117         struct drbd_interval i;
118         int do_wake;
119         u64 block_id;
120         int do_al_complete_io;
121
122         /* after we moved peer_req to done_ee,
123          * we may no longer access it,
124          * it may be freed/reused already!
125          * (as soon as we release the req_lock) */
126         i = peer_req->i;
127         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
128         block_id = peer_req->block_id;
129         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
130
131         spin_lock_irqsave(&device->resource->req_lock, flags);
132         device->writ_cnt += peer_req->i.size >> 9;
133         list_move_tail(&peer_req->w.list, &device->done_ee);
134
135         /*
136          * Do not remove from the write_requests tree here: we did not send the
137          * Ack yet and did not wake possibly waiting conflicting requests.
138          * Removed from the tree from "drbd_process_done_ee" within the
139          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
140          * _drbd_clear_done_ee.
141          */
142
143         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
144
145         /* FIXME do we want to detach for failed REQ_DISCARD?
146          * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
147         if (peer_req->flags & EE_WAS_ERROR)
148                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
149
150         if (connection->cstate >= C_WF_REPORT_PARAMS) {
151                 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
152                 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
153                         kref_put(&device->kref, drbd_destroy_device);
154         }
155         spin_unlock_irqrestore(&device->resource->req_lock, flags);
156
157         if (block_id == ID_SYNCER)
158                 drbd_rs_complete_io(device, i.sector);
159
160         if (do_wake)
161                 wake_up(&device->ee_wait);
162
163         if (do_al_complete_io)
164                 drbd_al_complete_io(device, &i);
165
166         put_ldev(device);
167 }
168
169 /* writes on behalf of the partner, or resync writes,
170  * "submitted" by the receiver.
171  */
172 void drbd_peer_request_endio(struct bio *bio)
173 {
174         struct drbd_peer_request *peer_req = bio->bi_private;
175         struct drbd_device *device = peer_req->peer_device->device;
176         int is_write = bio_data_dir(bio) == WRITE;
177         int is_discard = !!(bio->bi_rw & REQ_DISCARD);
178
179         if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
180                 drbd_warn(device, "%s: error=%d s=%llus\n",
181                                 is_write ? (is_discard ? "discard" : "write")
182                                         : "read", bio->bi_error,
183                                 (unsigned long long)peer_req->i.sector);
184
185         if (bio->bi_error)
186                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
187
188         bio_put(bio); /* no need for the bio anymore */
189         if (atomic_dec_and_test(&peer_req->pending_bios)) {
190                 if (is_write)
191                         drbd_endio_write_sec_final(peer_req);
192                 else
193                         drbd_endio_read_sec_final(peer_req);
194         }
195 }
196
197 void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
198 {
199         panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
200                 device->minor, device->resource->name, device->vnr);
201 }
202
203 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
204  */
205 void drbd_request_endio(struct bio *bio)
206 {
207         unsigned long flags;
208         struct drbd_request *req = bio->bi_private;
209         struct drbd_device *device = req->device;
210         struct bio_and_error m;
211         enum drbd_req_event what;
212
213         /* If this request was aborted locally before,
214          * but now was completed "successfully",
215          * chances are that this caused arbitrary data corruption.
216          *
217          * "aborting" requests, or force-detaching the disk, is intended for
218          * completely blocked/hung local backing devices which do no longer
219          * complete requests at all, not even do error completions.  In this
220          * situation, usually a hard-reset and failover is the only way out.
221          *
222          * By "aborting", basically faking a local error-completion,
223          * we allow for a more graceful swichover by cleanly migrating services.
224          * Still the affected node has to be rebooted "soon".
225          *
226          * By completing these requests, we allow the upper layers to re-use
227          * the associated data pages.
228          *
229          * If later the local backing device "recovers", and now DMAs some data
230          * from disk into the original request pages, in the best case it will
231          * just put random data into unused pages; but typically it will corrupt
232          * meanwhile completely unrelated data, causing all sorts of damage.
233          *
234          * Which means delayed successful completion,
235          * especially for READ requests,
236          * is a reason to panic().
237          *
238          * We assume that a delayed *error* completion is OK,
239          * though we still will complain noisily about it.
240          */
241         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
242                 if (__ratelimit(&drbd_ratelimit_state))
243                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
244
245                 if (!bio->bi_error)
246                         drbd_panic_after_delayed_completion_of_aborted_request(device);
247         }
248
249         /* to avoid recursion in __req_mod */
250         if (unlikely(bio->bi_error)) {
251                 if (bio->bi_rw & REQ_DISCARD)
252                         what = (bio->bi_error == -EOPNOTSUPP)
253                                 ? DISCARD_COMPLETED_NOTSUPP
254                                 : DISCARD_COMPLETED_WITH_ERROR;
255                 else
256                         what = (bio_data_dir(bio) == WRITE)
257                         ? WRITE_COMPLETED_WITH_ERROR
258                         : (bio_rw(bio) == READ)
259                           ? READ_COMPLETED_WITH_ERROR
260                           : READ_AHEAD_COMPLETED_WITH_ERROR;
261         } else
262                 what = COMPLETED_OK;
263
264         bio_put(req->private_bio);
265         req->private_bio = ERR_PTR(bio->bi_error);
266
267         /* not req_mod(), we need irqsave here! */
268         spin_lock_irqsave(&device->resource->req_lock, flags);
269         __req_mod(req, what, &m);
270         spin_unlock_irqrestore(&device->resource->req_lock, flags);
271         put_ldev(device);
272
273         if (m.bio)
274                 complete_master_bio(device, &m);
275 }
276
277 void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
278 {
279         struct hash_desc desc;
280         struct scatterlist sg;
281         struct page *page = peer_req->pages;
282         struct page *tmp;
283         unsigned len;
284
285         desc.tfm = tfm;
286         desc.flags = 0;
287
288         sg_init_table(&sg, 1);
289         crypto_hash_init(&desc);
290
291         while ((tmp = page_chain_next(page))) {
292                 /* all but the last page will be fully used */
293                 sg_set_page(&sg, page, PAGE_SIZE, 0);
294                 crypto_hash_update(&desc, &sg, sg.length);
295                 page = tmp;
296         }
297         /* and now the last, possibly only partially used page */
298         len = peer_req->i.size & (PAGE_SIZE - 1);
299         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
300         crypto_hash_update(&desc, &sg, sg.length);
301         crypto_hash_final(&desc, digest);
302 }
303
304 void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
305 {
306         struct hash_desc desc;
307         struct scatterlist sg;
308         struct bio_vec bvec;
309         struct bvec_iter iter;
310
311         desc.tfm = tfm;
312         desc.flags = 0;
313
314         sg_init_table(&sg, 1);
315         crypto_hash_init(&desc);
316
317         bio_for_each_segment(bvec, bio, iter) {
318                 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
319                 crypto_hash_update(&desc, &sg, sg.length);
320         }
321         crypto_hash_final(&desc, digest);
322 }
323
324 /* MAYBE merge common code with w_e_end_ov_req */
325 static int w_e_send_csum(struct drbd_work *w, int cancel)
326 {
327         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
328         struct drbd_peer_device *peer_device = peer_req->peer_device;
329         struct drbd_device *device = peer_device->device;
330         int digest_size;
331         void *digest;
332         int err = 0;
333
334         if (unlikely(cancel))
335                 goto out;
336
337         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
338                 goto out;
339
340         digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
341         digest = kmalloc(digest_size, GFP_NOIO);
342         if (digest) {
343                 sector_t sector = peer_req->i.sector;
344                 unsigned int size = peer_req->i.size;
345                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
346                 /* Free peer_req and pages before send.
347                  * In case we block on congestion, we could otherwise run into
348                  * some distributed deadlock, if the other side blocks on
349                  * congestion as well, because our receiver blocks in
350                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
351                 drbd_free_peer_req(device, peer_req);
352                 peer_req = NULL;
353                 inc_rs_pending(device);
354                 err = drbd_send_drequest_csum(peer_device, sector, size,
355                                               digest, digest_size,
356                                               P_CSUM_RS_REQUEST);
357                 kfree(digest);
358         } else {
359                 drbd_err(device, "kmalloc() of digest failed.\n");
360                 err = -ENOMEM;
361         }
362
363 out:
364         if (peer_req)
365                 drbd_free_peer_req(device, peer_req);
366
367         if (unlikely(err))
368                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
369         return err;
370 }
371
372 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
373
374 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
375 {
376         struct drbd_device *device = peer_device->device;
377         struct drbd_peer_request *peer_req;
378
379         if (!get_ldev(device))
380                 return -EIO;
381
382         /* GFP_TRY, because if there is no memory available right now, this may
383          * be rescheduled for later. It is "only" background resync, after all. */
384         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
385                                        size, true /* has real payload */, GFP_TRY);
386         if (!peer_req)
387                 goto defer;
388
389         peer_req->w.cb = w_e_send_csum;
390         spin_lock_irq(&device->resource->req_lock);
391         list_add_tail(&peer_req->w.list, &device->read_ee);
392         spin_unlock_irq(&device->resource->req_lock);
393
394         atomic_add(size >> 9, &device->rs_sect_ev);
395         if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
396                 return 0;
397
398         /* If it failed because of ENOMEM, retry should help.  If it failed
399          * because bio_add_page failed (probably broken lower level driver),
400          * retry may or may not help.
401          * If it does not, you may need to force disconnect. */
402         spin_lock_irq(&device->resource->req_lock);
403         list_del(&peer_req->w.list);
404         spin_unlock_irq(&device->resource->req_lock);
405
406         drbd_free_peer_req(device, peer_req);
407 defer:
408         put_ldev(device);
409         return -EAGAIN;
410 }
411
412 int w_resync_timer(struct drbd_work *w, int cancel)
413 {
414         struct drbd_device *device =
415                 container_of(w, struct drbd_device, resync_work);
416
417         switch (device->state.conn) {
418         case C_VERIFY_S:
419                 make_ov_request(device, cancel);
420                 break;
421         case C_SYNC_TARGET:
422                 make_resync_request(device, cancel);
423                 break;
424         }
425
426         return 0;
427 }
428
429 void resync_timer_fn(unsigned long data)
430 {
431         struct drbd_device *device = (struct drbd_device *) data;
432
433         drbd_queue_work_if_unqueued(
434                 &first_peer_device(device)->connection->sender_work,
435                 &device->resync_work);
436 }
437
438 static void fifo_set(struct fifo_buffer *fb, int value)
439 {
440         int i;
441
442         for (i = 0; i < fb->size; i++)
443                 fb->values[i] = value;
444 }
445
446 static int fifo_push(struct fifo_buffer *fb, int value)
447 {
448         int ov;
449
450         ov = fb->values[fb->head_index];
451         fb->values[fb->head_index++] = value;
452
453         if (fb->head_index >= fb->size)
454                 fb->head_index = 0;
455
456         return ov;
457 }
458
459 static void fifo_add_val(struct fifo_buffer *fb, int value)
460 {
461         int i;
462
463         for (i = 0; i < fb->size; i++)
464                 fb->values[i] += value;
465 }
466
467 struct fifo_buffer *fifo_alloc(int fifo_size)
468 {
469         struct fifo_buffer *fb;
470
471         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
472         if (!fb)
473                 return NULL;
474
475         fb->head_index = 0;
476         fb->size = fifo_size;
477         fb->total = 0;
478
479         return fb;
480 }
481
482 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
483 {
484         struct disk_conf *dc;
485         unsigned int want;     /* The number of sectors we want in-flight */
486         int req_sect; /* Number of sectors to request in this turn */
487         int correction; /* Number of sectors more we need in-flight */
488         int cps; /* correction per invocation of drbd_rs_controller() */
489         int steps; /* Number of time steps to plan ahead */
490         int curr_corr;
491         int max_sect;
492         struct fifo_buffer *plan;
493
494         dc = rcu_dereference(device->ldev->disk_conf);
495         plan = rcu_dereference(device->rs_plan_s);
496
497         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
498
499         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
500                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
501         } else { /* normal path */
502                 want = dc->c_fill_target ? dc->c_fill_target :
503                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
504         }
505
506         correction = want - device->rs_in_flight - plan->total;
507
508         /* Plan ahead */
509         cps = correction / steps;
510         fifo_add_val(plan, cps);
511         plan->total += cps * steps;
512
513         /* What we do in this step */
514         curr_corr = fifo_push(plan, 0);
515         plan->total -= curr_corr;
516
517         req_sect = sect_in + curr_corr;
518         if (req_sect < 0)
519                 req_sect = 0;
520
521         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
522         if (req_sect > max_sect)
523                 req_sect = max_sect;
524
525         /*
526         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
527                  sect_in, device->rs_in_flight, want, correction,
528                  steps, cps, device->rs_planed, curr_corr, req_sect);
529         */
530
531         return req_sect;
532 }
533
534 static int drbd_rs_number_requests(struct drbd_device *device)
535 {
536         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
537         int number, mxb;
538
539         sect_in = atomic_xchg(&device->rs_sect_in, 0);
540         device->rs_in_flight -= sect_in;
541
542         rcu_read_lock();
543         mxb = drbd_get_max_buffers(device) / 2;
544         if (rcu_dereference(device->rs_plan_s)->size) {
545                 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
546                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
547         } else {
548                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
549                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
550         }
551         rcu_read_unlock();
552
553         /* Don't have more than "max-buffers"/2 in-flight.
554          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
555          * potentially causing a distributed deadlock on congestion during
556          * online-verify or (checksum-based) resync, if max-buffers,
557          * socket buffer sizes and resync rate settings are mis-configured. */
558
559         /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
560          * mxb (as used here, and in drbd_alloc_pages on the peer) is
561          * "number of pages" (typically also 4k),
562          * but "rs_in_flight" is in "sectors" (512 Byte). */
563         if (mxb - device->rs_in_flight/8 < number)
564                 number = mxb - device->rs_in_flight/8;
565
566         return number;
567 }
568
569 static int make_resync_request(struct drbd_device *const device, int cancel)
570 {
571         struct drbd_peer_device *const peer_device = first_peer_device(device);
572         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
573         unsigned long bit;
574         sector_t sector;
575         const sector_t capacity = drbd_get_capacity(device->this_bdev);
576         int max_bio_size;
577         int number, rollback_i, size;
578         int align, requeue = 0;
579         int i = 0;
580
581         if (unlikely(cancel))
582                 return 0;
583
584         if (device->rs_total == 0) {
585                 /* empty resync? */
586                 drbd_resync_finished(device);
587                 return 0;
588         }
589
590         if (!get_ldev(device)) {
591                 /* Since we only need to access device->rsync a
592                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
593                    to continue resync with a broken disk makes no sense at
594                    all */
595                 drbd_err(device, "Disk broke down during resync!\n");
596                 return 0;
597         }
598
599         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
600         number = drbd_rs_number_requests(device);
601         if (number <= 0)
602                 goto requeue;
603
604         for (i = 0; i < number; i++) {
605                 /* Stop generating RS requests when half of the send buffer is filled,
606                  * but notify TCP that we'd like to have more space. */
607                 mutex_lock(&connection->data.mutex);
608                 if (connection->data.socket) {
609                         struct sock *sk = connection->data.socket->sk;
610                         int queued = sk->sk_wmem_queued;
611                         int sndbuf = sk->sk_sndbuf;
612                         if (queued > sndbuf / 2) {
613                                 requeue = 1;
614                                 if (sk->sk_socket)
615                                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
616                         }
617                 } else
618                         requeue = 1;
619                 mutex_unlock(&connection->data.mutex);
620                 if (requeue)
621                         goto requeue;
622
623 next_sector:
624                 size = BM_BLOCK_SIZE;
625                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
626
627                 if (bit == DRBD_END_OF_BITMAP) {
628                         device->bm_resync_fo = drbd_bm_bits(device);
629                         put_ldev(device);
630                         return 0;
631                 }
632
633                 sector = BM_BIT_TO_SECT(bit);
634
635                 if (drbd_try_rs_begin_io(device, sector)) {
636                         device->bm_resync_fo = bit;
637                         goto requeue;
638                 }
639                 device->bm_resync_fo = bit + 1;
640
641                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
642                         drbd_rs_complete_io(device, sector);
643                         goto next_sector;
644                 }
645
646 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
647                 /* try to find some adjacent bits.
648                  * we stop if we have already the maximum req size.
649                  *
650                  * Additionally always align bigger requests, in order to
651                  * be prepared for all stripe sizes of software RAIDs.
652                  */
653                 align = 1;
654                 rollback_i = i;
655                 while (i < number) {
656                         if (size + BM_BLOCK_SIZE > max_bio_size)
657                                 break;
658
659                         /* Be always aligned */
660                         if (sector & ((1<<(align+3))-1))
661                                 break;
662
663                         /* do not cross extent boundaries */
664                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
665                                 break;
666                         /* now, is it actually dirty, after all?
667                          * caution, drbd_bm_test_bit is tri-state for some
668                          * obscure reason; ( b == 0 ) would get the out-of-band
669                          * only accidentally right because of the "oddly sized"
670                          * adjustment below */
671                         if (drbd_bm_test_bit(device, bit+1) != 1)
672                                 break;
673                         bit++;
674                         size += BM_BLOCK_SIZE;
675                         if ((BM_BLOCK_SIZE << align) <= size)
676                                 align++;
677                         i++;
678                 }
679                 /* if we merged some,
680                  * reset the offset to start the next drbd_bm_find_next from */
681                 if (size > BM_BLOCK_SIZE)
682                         device->bm_resync_fo = bit + 1;
683 #endif
684
685                 /* adjust very last sectors, in case we are oddly sized */
686                 if (sector + (size>>9) > capacity)
687                         size = (capacity-sector)<<9;
688
689                 if (device->use_csums) {
690                         switch (read_for_csum(peer_device, sector, size)) {
691                         case -EIO: /* Disk failure */
692                                 put_ldev(device);
693                                 return -EIO;
694                         case -EAGAIN: /* allocation failed, or ldev busy */
695                                 drbd_rs_complete_io(device, sector);
696                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
697                                 i = rollback_i;
698                                 goto requeue;
699                         case 0:
700                                 /* everything ok */
701                                 break;
702                         default:
703                                 BUG();
704                         }
705                 } else {
706                         int err;
707
708                         inc_rs_pending(device);
709                         err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
710                                                  sector, size, ID_SYNCER);
711                         if (err) {
712                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
713                                 dec_rs_pending(device);
714                                 put_ldev(device);
715                                 return err;
716                         }
717                 }
718         }
719
720         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
721                 /* last syncer _request_ was sent,
722                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
723                  * next sync group will resume), as soon as we receive the last
724                  * resync data block, and the last bit is cleared.
725                  * until then resync "work" is "inactive" ...
726                  */
727                 put_ldev(device);
728                 return 0;
729         }
730
731  requeue:
732         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
733         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
734         put_ldev(device);
735         return 0;
736 }
737
738 static int make_ov_request(struct drbd_device *device, int cancel)
739 {
740         int number, i, size;
741         sector_t sector;
742         const sector_t capacity = drbd_get_capacity(device->this_bdev);
743         bool stop_sector_reached = false;
744
745         if (unlikely(cancel))
746                 return 1;
747
748         number = drbd_rs_number_requests(device);
749
750         sector = device->ov_position;
751         for (i = 0; i < number; i++) {
752                 if (sector >= capacity)
753                         return 1;
754
755                 /* We check for "finished" only in the reply path:
756                  * w_e_end_ov_reply().
757                  * We need to send at least one request out. */
758                 stop_sector_reached = i > 0
759                         && verify_can_do_stop_sector(device)
760                         && sector >= device->ov_stop_sector;
761                 if (stop_sector_reached)
762                         break;
763
764                 size = BM_BLOCK_SIZE;
765
766                 if (drbd_try_rs_begin_io(device, sector)) {
767                         device->ov_position = sector;
768                         goto requeue;
769                 }
770
771                 if (sector + (size>>9) > capacity)
772                         size = (capacity-sector)<<9;
773
774                 inc_rs_pending(device);
775                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
776                         dec_rs_pending(device);
777                         return 0;
778                 }
779                 sector += BM_SECT_PER_BIT;
780         }
781         device->ov_position = sector;
782
783  requeue:
784         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
785         if (i == 0 || !stop_sector_reached)
786                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
787         return 1;
788 }
789
790 int w_ov_finished(struct drbd_work *w, int cancel)
791 {
792         struct drbd_device_work *dw =
793                 container_of(w, struct drbd_device_work, w);
794         struct drbd_device *device = dw->device;
795         kfree(dw);
796         ov_out_of_sync_print(device);
797         drbd_resync_finished(device);
798
799         return 0;
800 }
801
802 static int w_resync_finished(struct drbd_work *w, int cancel)
803 {
804         struct drbd_device_work *dw =
805                 container_of(w, struct drbd_device_work, w);
806         struct drbd_device *device = dw->device;
807         kfree(dw);
808
809         drbd_resync_finished(device);
810
811         return 0;
812 }
813
814 static void ping_peer(struct drbd_device *device)
815 {
816         struct drbd_connection *connection = first_peer_device(device)->connection;
817
818         clear_bit(GOT_PING_ACK, &connection->flags);
819         request_ping(connection);
820         wait_event(connection->ping_wait,
821                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
822 }
823
824 int drbd_resync_finished(struct drbd_device *device)
825 {
826         unsigned long db, dt, dbdt;
827         unsigned long n_oos;
828         union drbd_state os, ns;
829         struct drbd_device_work *dw;
830         char *khelper_cmd = NULL;
831         int verify_done = 0;
832
833         /* Remove all elements from the resync LRU. Since future actions
834          * might set bits in the (main) bitmap, then the entries in the
835          * resync LRU would be wrong. */
836         if (drbd_rs_del_all(device)) {
837                 /* In case this is not possible now, most probably because
838                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
839                  * queue (or even the read operations for those packets
840                  * is not finished by now).   Retry in 100ms. */
841
842                 schedule_timeout_interruptible(HZ / 10);
843                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
844                 if (dw) {
845                         dw->w.cb = w_resync_finished;
846                         dw->device = device;
847                         drbd_queue_work(&first_peer_device(device)->connection->sender_work,
848                                         &dw->w);
849                         return 1;
850                 }
851                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
852         }
853
854         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
855         if (dt <= 0)
856                 dt = 1;
857
858         db = device->rs_total;
859         /* adjust for verify start and stop sectors, respective reached position */
860         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
861                 db -= device->ov_left;
862
863         dbdt = Bit2KB(db/dt);
864         device->rs_paused /= HZ;
865
866         if (!get_ldev(device))
867                 goto out;
868
869         ping_peer(device);
870
871         spin_lock_irq(&device->resource->req_lock);
872         os = drbd_read_state(device);
873
874         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
875
876         /* This protects us against multiple calls (that can happen in the presence
877            of application IO), and against connectivity loss just before we arrive here. */
878         if (os.conn <= C_CONNECTED)
879                 goto out_unlock;
880
881         ns = os;
882         ns.conn = C_CONNECTED;
883
884         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
885              verify_done ? "Online verify" : "Resync",
886              dt + device->rs_paused, device->rs_paused, dbdt);
887
888         n_oos = drbd_bm_total_weight(device);
889
890         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
891                 if (n_oos) {
892                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
893                               n_oos, Bit2KB(1));
894                         khelper_cmd = "out-of-sync";
895                 }
896         } else {
897                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
898
899                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
900                         khelper_cmd = "after-resync-target";
901
902                 if (device->use_csums && device->rs_total) {
903                         const unsigned long s = device->rs_same_csum;
904                         const unsigned long t = device->rs_total;
905                         const int ratio =
906                                 (t == 0)     ? 0 :
907                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
908                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
909                              "transferred %luK total %luK\n",
910                              ratio,
911                              Bit2KB(device->rs_same_csum),
912                              Bit2KB(device->rs_total - device->rs_same_csum),
913                              Bit2KB(device->rs_total));
914                 }
915         }
916
917         if (device->rs_failed) {
918                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
919
920                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
921                         ns.disk = D_INCONSISTENT;
922                         ns.pdsk = D_UP_TO_DATE;
923                 } else {
924                         ns.disk = D_UP_TO_DATE;
925                         ns.pdsk = D_INCONSISTENT;
926                 }
927         } else {
928                 ns.disk = D_UP_TO_DATE;
929                 ns.pdsk = D_UP_TO_DATE;
930
931                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
932                         if (device->p_uuid) {
933                                 int i;
934                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
935                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
936                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
937                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
938                         } else {
939                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
940                         }
941                 }
942
943                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
944                         /* for verify runs, we don't update uuids here,
945                          * so there would be nothing to report. */
946                         drbd_uuid_set_bm(device, 0UL);
947                         drbd_print_uuids(device, "updated UUIDs");
948                         if (device->p_uuid) {
949                                 /* Now the two UUID sets are equal, update what we
950                                  * know of the peer. */
951                                 int i;
952                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
953                                         device->p_uuid[i] = device->ldev->md.uuid[i];
954                         }
955                 }
956         }
957
958         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
959 out_unlock:
960         spin_unlock_irq(&device->resource->req_lock);
961         put_ldev(device);
962 out:
963         device->rs_total  = 0;
964         device->rs_failed = 0;
965         device->rs_paused = 0;
966
967         /* reset start sector, if we reached end of device */
968         if (verify_done && device->ov_left == 0)
969                 device->ov_start_sector = 0;
970
971         drbd_md_sync(device);
972
973         if (khelper_cmd)
974                 drbd_khelper(device, khelper_cmd);
975
976         return 1;
977 }
978
979 /* helper */
980 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
981 {
982         if (drbd_peer_req_has_active_page(peer_req)) {
983                 /* This might happen if sendpage() has not finished */
984                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
985                 atomic_add(i, &device->pp_in_use_by_net);
986                 atomic_sub(i, &device->pp_in_use);
987                 spin_lock_irq(&device->resource->req_lock);
988                 list_add_tail(&peer_req->w.list, &device->net_ee);
989                 spin_unlock_irq(&device->resource->req_lock);
990                 wake_up(&drbd_pp_wait);
991         } else
992                 drbd_free_peer_req(device, peer_req);
993 }
994
995 /**
996  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
997  * @device:     DRBD device.
998  * @w:          work object.
999  * @cancel:     The connection will be closed anyways
1000  */
1001 int w_e_end_data_req(struct drbd_work *w, int cancel)
1002 {
1003         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1004         struct drbd_peer_device *peer_device = peer_req->peer_device;
1005         struct drbd_device *device = peer_device->device;
1006         int err;
1007
1008         if (unlikely(cancel)) {
1009                 drbd_free_peer_req(device, peer_req);
1010                 dec_unacked(device);
1011                 return 0;
1012         }
1013
1014         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1015                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1016         } else {
1017                 if (__ratelimit(&drbd_ratelimit_state))
1018                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1019                             (unsigned long long)peer_req->i.sector);
1020
1021                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1022         }
1023
1024         dec_unacked(device);
1025
1026         move_to_net_ee_or_free(device, peer_req);
1027
1028         if (unlikely(err))
1029                 drbd_err(device, "drbd_send_block() failed\n");
1030         return err;
1031 }
1032
1033 /**
1034  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1035  * @w:          work object.
1036  * @cancel:     The connection will be closed anyways
1037  */
1038 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1039 {
1040         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1041         struct drbd_peer_device *peer_device = peer_req->peer_device;
1042         struct drbd_device *device = peer_device->device;
1043         int err;
1044
1045         if (unlikely(cancel)) {
1046                 drbd_free_peer_req(device, peer_req);
1047                 dec_unacked(device);
1048                 return 0;
1049         }
1050
1051         if (get_ldev_if_state(device, D_FAILED)) {
1052                 drbd_rs_complete_io(device, peer_req->i.sector);
1053                 put_ldev(device);
1054         }
1055
1056         if (device->state.conn == C_AHEAD) {
1057                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1058         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1059                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1060                         inc_rs_pending(device);
1061                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1062                 } else {
1063                         if (__ratelimit(&drbd_ratelimit_state))
1064                                 drbd_err(device, "Not sending RSDataReply, "
1065                                     "partner DISKLESS!\n");
1066                         err = 0;
1067                 }
1068         } else {
1069                 if (__ratelimit(&drbd_ratelimit_state))
1070                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1071                             (unsigned long long)peer_req->i.sector);
1072
1073                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1074
1075                 /* update resync data with failure */
1076                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1077         }
1078
1079         dec_unacked(device);
1080
1081         move_to_net_ee_or_free(device, peer_req);
1082
1083         if (unlikely(err))
1084                 drbd_err(device, "drbd_send_block() failed\n");
1085         return err;
1086 }
1087
1088 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1089 {
1090         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1091         struct drbd_peer_device *peer_device = peer_req->peer_device;
1092         struct drbd_device *device = peer_device->device;
1093         struct digest_info *di;
1094         int digest_size;
1095         void *digest = NULL;
1096         int err, eq = 0;
1097
1098         if (unlikely(cancel)) {
1099                 drbd_free_peer_req(device, peer_req);
1100                 dec_unacked(device);
1101                 return 0;
1102         }
1103
1104         if (get_ldev(device)) {
1105                 drbd_rs_complete_io(device, peer_req->i.sector);
1106                 put_ldev(device);
1107         }
1108
1109         di = peer_req->digest;
1110
1111         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1112                 /* quick hack to try to avoid a race against reconfiguration.
1113                  * a real fix would be much more involved,
1114                  * introducing more locking mechanisms */
1115                 if (peer_device->connection->csums_tfm) {
1116                         digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1117                         D_ASSERT(device, digest_size == di->digest_size);
1118                         digest = kmalloc(digest_size, GFP_NOIO);
1119                 }
1120                 if (digest) {
1121                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1122                         eq = !memcmp(digest, di->digest, digest_size);
1123                         kfree(digest);
1124                 }
1125
1126                 if (eq) {
1127                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1128                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1129                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1130                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1131                 } else {
1132                         inc_rs_pending(device);
1133                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1134                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1135                         kfree(di);
1136                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1137                 }
1138         } else {
1139                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1140                 if (__ratelimit(&drbd_ratelimit_state))
1141                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1142         }
1143
1144         dec_unacked(device);
1145         move_to_net_ee_or_free(device, peer_req);
1146
1147         if (unlikely(err))
1148                 drbd_err(device, "drbd_send_block/ack() failed\n");
1149         return err;
1150 }
1151
1152 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1153 {
1154         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1155         struct drbd_peer_device *peer_device = peer_req->peer_device;
1156         struct drbd_device *device = peer_device->device;
1157         sector_t sector = peer_req->i.sector;
1158         unsigned int size = peer_req->i.size;
1159         int digest_size;
1160         void *digest;
1161         int err = 0;
1162
1163         if (unlikely(cancel))
1164                 goto out;
1165
1166         digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1167         digest = kmalloc(digest_size, GFP_NOIO);
1168         if (!digest) {
1169                 err = 1;        /* terminate the connection in case the allocation failed */
1170                 goto out;
1171         }
1172
1173         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1174                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1175         else
1176                 memset(digest, 0, digest_size);
1177
1178         /* Free e and pages before send.
1179          * In case we block on congestion, we could otherwise run into
1180          * some distributed deadlock, if the other side blocks on
1181          * congestion as well, because our receiver blocks in
1182          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1183         drbd_free_peer_req(device, peer_req);
1184         peer_req = NULL;
1185         inc_rs_pending(device);
1186         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1187         if (err)
1188                 dec_rs_pending(device);
1189         kfree(digest);
1190
1191 out:
1192         if (peer_req)
1193                 drbd_free_peer_req(device, peer_req);
1194         dec_unacked(device);
1195         return err;
1196 }
1197
1198 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1199 {
1200         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1201                 device->ov_last_oos_size += size>>9;
1202         } else {
1203                 device->ov_last_oos_start = sector;
1204                 device->ov_last_oos_size = size>>9;
1205         }
1206         drbd_set_out_of_sync(device, sector, size);
1207 }
1208
1209 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1210 {
1211         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1212         struct drbd_peer_device *peer_device = peer_req->peer_device;
1213         struct drbd_device *device = peer_device->device;
1214         struct digest_info *di;
1215         void *digest;
1216         sector_t sector = peer_req->i.sector;
1217         unsigned int size = peer_req->i.size;
1218         int digest_size;
1219         int err, eq = 0;
1220         bool stop_sector_reached = false;
1221
1222         if (unlikely(cancel)) {
1223                 drbd_free_peer_req(device, peer_req);
1224                 dec_unacked(device);
1225                 return 0;
1226         }
1227
1228         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1229          * the resync lru has been cleaned up already */
1230         if (get_ldev(device)) {
1231                 drbd_rs_complete_io(device, peer_req->i.sector);
1232                 put_ldev(device);
1233         }
1234
1235         di = peer_req->digest;
1236
1237         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1238                 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1239                 digest = kmalloc(digest_size, GFP_NOIO);
1240                 if (digest) {
1241                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1242
1243                         D_ASSERT(device, digest_size == di->digest_size);
1244                         eq = !memcmp(digest, di->digest, digest_size);
1245                         kfree(digest);
1246                 }
1247         }
1248
1249         /* Free peer_req and pages before send.
1250          * In case we block on congestion, we could otherwise run into
1251          * some distributed deadlock, if the other side blocks on
1252          * congestion as well, because our receiver blocks in
1253          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1254         drbd_free_peer_req(device, peer_req);
1255         if (!eq)
1256                 drbd_ov_out_of_sync_found(device, sector, size);
1257         else
1258                 ov_out_of_sync_print(device);
1259
1260         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1261                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1262
1263         dec_unacked(device);
1264
1265         --device->ov_left;
1266
1267         /* let's advance progress step marks only for every other megabyte */
1268         if ((device->ov_left & 0x200) == 0x200)
1269                 drbd_advance_rs_marks(device, device->ov_left);
1270
1271         stop_sector_reached = verify_can_do_stop_sector(device) &&
1272                 (sector + (size>>9)) >= device->ov_stop_sector;
1273
1274         if (device->ov_left == 0 || stop_sector_reached) {
1275                 ov_out_of_sync_print(device);
1276                 drbd_resync_finished(device);
1277         }
1278
1279         return err;
1280 }
1281
1282 /* FIXME
1283  * We need to track the number of pending barrier acks,
1284  * and to be able to wait for them.
1285  * See also comment in drbd_adm_attach before drbd_suspend_io.
1286  */
1287 static int drbd_send_barrier(struct drbd_connection *connection)
1288 {
1289         struct p_barrier *p;
1290         struct drbd_socket *sock;
1291
1292         sock = &connection->data;
1293         p = conn_prepare_command(connection, sock);
1294         if (!p)
1295                 return -EIO;
1296         p->barrier = connection->send.current_epoch_nr;
1297         p->pad = 0;
1298         connection->send.current_epoch_writes = 0;
1299         connection->send.last_sent_barrier_jif = jiffies;
1300
1301         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1302 }
1303
1304 int w_send_write_hint(struct drbd_work *w, int cancel)
1305 {
1306         struct drbd_device *device =
1307                 container_of(w, struct drbd_device, unplug_work);
1308         struct drbd_socket *sock;
1309
1310         if (cancel)
1311                 return 0;
1312         sock = &first_peer_device(device)->connection->data;
1313         if (!drbd_prepare_command(first_peer_device(device), sock))
1314                 return -EIO;
1315         return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1316 }
1317
1318 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1319 {
1320         if (!connection->send.seen_any_write_yet) {
1321                 connection->send.seen_any_write_yet = true;
1322                 connection->send.current_epoch_nr = epoch;
1323                 connection->send.current_epoch_writes = 0;
1324                 connection->send.last_sent_barrier_jif = jiffies;
1325         }
1326 }
1327
1328 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1329 {
1330         /* re-init if first write on this connection */
1331         if (!connection->send.seen_any_write_yet)
1332                 return;
1333         if (connection->send.current_epoch_nr != epoch) {
1334                 if (connection->send.current_epoch_writes)
1335                         drbd_send_barrier(connection);
1336                 connection->send.current_epoch_nr = epoch;
1337         }
1338 }
1339
1340 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1341 {
1342         struct drbd_request *req = container_of(w, struct drbd_request, w);
1343         struct drbd_device *device = req->device;
1344         struct drbd_peer_device *const peer_device = first_peer_device(device);
1345         struct drbd_connection *const connection = peer_device->connection;
1346         int err;
1347
1348         if (unlikely(cancel)) {
1349                 req_mod(req, SEND_CANCELED);
1350                 return 0;
1351         }
1352         req->pre_send_jif = jiffies;
1353
1354         /* this time, no connection->send.current_epoch_writes++;
1355          * If it was sent, it was the closing barrier for the last
1356          * replicated epoch, before we went into AHEAD mode.
1357          * No more barriers will be sent, until we leave AHEAD mode again. */
1358         maybe_send_barrier(connection, req->epoch);
1359
1360         err = drbd_send_out_of_sync(peer_device, req);
1361         req_mod(req, OOS_HANDED_TO_NETWORK);
1362
1363         return err;
1364 }
1365
1366 /**
1367  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1368  * @w:          work object.
1369  * @cancel:     The connection will be closed anyways
1370  */
1371 int w_send_dblock(struct drbd_work *w, int cancel)
1372 {
1373         struct drbd_request *req = container_of(w, struct drbd_request, w);
1374         struct drbd_device *device = req->device;
1375         struct drbd_peer_device *const peer_device = first_peer_device(device);
1376         struct drbd_connection *connection = peer_device->connection;
1377         int err;
1378
1379         if (unlikely(cancel)) {
1380                 req_mod(req, SEND_CANCELED);
1381                 return 0;
1382         }
1383         req->pre_send_jif = jiffies;
1384
1385         re_init_if_first_write(connection, req->epoch);
1386         maybe_send_barrier(connection, req->epoch);
1387         connection->send.current_epoch_writes++;
1388
1389         err = drbd_send_dblock(peer_device, req);
1390         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1391
1392         return err;
1393 }
1394
1395 /**
1396  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1397  * @w:          work object.
1398  * @cancel:     The connection will be closed anyways
1399  */
1400 int w_send_read_req(struct drbd_work *w, int cancel)
1401 {
1402         struct drbd_request *req = container_of(w, struct drbd_request, w);
1403         struct drbd_device *device = req->device;
1404         struct drbd_peer_device *const peer_device = first_peer_device(device);
1405         struct drbd_connection *connection = peer_device->connection;
1406         int err;
1407
1408         if (unlikely(cancel)) {
1409                 req_mod(req, SEND_CANCELED);
1410                 return 0;
1411         }
1412         req->pre_send_jif = jiffies;
1413
1414         /* Even read requests may close a write epoch,
1415          * if there was any yet. */
1416         maybe_send_barrier(connection, req->epoch);
1417
1418         err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1419                                  (unsigned long)req);
1420
1421         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1422
1423         return err;
1424 }
1425
1426 int w_restart_disk_io(struct drbd_work *w, int cancel)
1427 {
1428         struct drbd_request *req = container_of(w, struct drbd_request, w);
1429         struct drbd_device *device = req->device;
1430
1431         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1432                 drbd_al_begin_io(device, &req->i);
1433
1434         drbd_req_make_private_bio(req, req->master_bio);
1435         req->private_bio->bi_bdev = device->ldev->backing_bdev;
1436         generic_make_request(req->private_bio);
1437
1438         return 0;
1439 }
1440
1441 static int _drbd_may_sync_now(struct drbd_device *device)
1442 {
1443         struct drbd_device *odev = device;
1444         int resync_after;
1445
1446         while (1) {
1447                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1448                         return 1;
1449                 rcu_read_lock();
1450                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1451                 rcu_read_unlock();
1452                 if (resync_after == -1)
1453                         return 1;
1454                 odev = minor_to_device(resync_after);
1455                 if (!odev)
1456                         return 1;
1457                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1458                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1459                     odev->state.aftr_isp || odev->state.peer_isp ||
1460                     odev->state.user_isp)
1461                         return 0;
1462         }
1463 }
1464
1465 /**
1466  * drbd_pause_after() - Pause resync on all devices that may not resync now
1467  * @device:     DRBD device.
1468  *
1469  * Called from process context only (admin command and after_state_ch).
1470  */
1471 static bool drbd_pause_after(struct drbd_device *device)
1472 {
1473         bool changed = false;
1474         struct drbd_device *odev;
1475         int i;
1476
1477         rcu_read_lock();
1478         idr_for_each_entry(&drbd_devices, odev, i) {
1479                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1480                         continue;
1481                 if (!_drbd_may_sync_now(odev) &&
1482                     _drbd_set_state(_NS(odev, aftr_isp, 1),
1483                                     CS_HARD, NULL) != SS_NOTHING_TO_DO)
1484                         changed = true;
1485         }
1486         rcu_read_unlock();
1487
1488         return changed;
1489 }
1490
1491 /**
1492  * drbd_resume_next() - Resume resync on all devices that may resync now
1493  * @device:     DRBD device.
1494  *
1495  * Called from process context only (admin command and worker).
1496  */
1497 static bool drbd_resume_next(struct drbd_device *device)
1498 {
1499         bool changed = false;
1500         struct drbd_device *odev;
1501         int i;
1502
1503         rcu_read_lock();
1504         idr_for_each_entry(&drbd_devices, odev, i) {
1505                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1506                         continue;
1507                 if (odev->state.aftr_isp) {
1508                         if (_drbd_may_sync_now(odev) &&
1509                             _drbd_set_state(_NS(odev, aftr_isp, 0),
1510                                             CS_HARD, NULL) != SS_NOTHING_TO_DO)
1511                                 changed = true;
1512                 }
1513         }
1514         rcu_read_unlock();
1515         return changed;
1516 }
1517
1518 void resume_next_sg(struct drbd_device *device)
1519 {
1520         lock_all_resources();
1521         drbd_resume_next(device);
1522         unlock_all_resources();
1523 }
1524
1525 void suspend_other_sg(struct drbd_device *device)
1526 {
1527         lock_all_resources();
1528         drbd_pause_after(device);
1529         unlock_all_resources();
1530 }
1531
1532 /* caller must lock_all_resources() */
1533 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1534 {
1535         struct drbd_device *odev;
1536         int resync_after;
1537
1538         if (o_minor == -1)
1539                 return NO_ERROR;
1540         if (o_minor < -1 || o_minor > MINORMASK)
1541                 return ERR_RESYNC_AFTER;
1542
1543         /* check for loops */
1544         odev = minor_to_device(o_minor);
1545         while (1) {
1546                 if (odev == device)
1547                         return ERR_RESYNC_AFTER_CYCLE;
1548
1549                 /* You are free to depend on diskless, non-existing,
1550                  * or not yet/no longer existing minors.
1551                  * We only reject dependency loops.
1552                  * We cannot follow the dependency chain beyond a detached or
1553                  * missing minor.
1554                  */
1555                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1556                         return NO_ERROR;
1557
1558                 rcu_read_lock();
1559                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1560                 rcu_read_unlock();
1561                 /* dependency chain ends here, no cycles. */
1562                 if (resync_after == -1)
1563                         return NO_ERROR;
1564
1565                 /* follow the dependency chain */
1566                 odev = minor_to_device(resync_after);
1567         }
1568 }
1569
1570 /* caller must lock_all_resources() */
1571 void drbd_resync_after_changed(struct drbd_device *device)
1572 {
1573         int changed;
1574
1575         do {
1576                 changed  = drbd_pause_after(device);
1577                 changed |= drbd_resume_next(device);
1578         } while (changed);
1579 }
1580
1581 void drbd_rs_controller_reset(struct drbd_device *device)
1582 {
1583         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1584         struct fifo_buffer *plan;
1585
1586         atomic_set(&device->rs_sect_in, 0);
1587         atomic_set(&device->rs_sect_ev, 0);
1588         device->rs_in_flight = 0;
1589         device->rs_last_events =
1590                 (int)part_stat_read(&disk->part0, sectors[0]) +
1591                 (int)part_stat_read(&disk->part0, sectors[1]);
1592
1593         /* Updating the RCU protected object in place is necessary since
1594            this function gets called from atomic context.
1595            It is valid since all other updates also lead to an completely
1596            empty fifo */
1597         rcu_read_lock();
1598         plan = rcu_dereference(device->rs_plan_s);
1599         plan->total = 0;
1600         fifo_set(plan, 0);
1601         rcu_read_unlock();
1602 }
1603
1604 void start_resync_timer_fn(unsigned long data)
1605 {
1606         struct drbd_device *device = (struct drbd_device *) data;
1607         drbd_device_post_work(device, RS_START);
1608 }
1609
1610 static void do_start_resync(struct drbd_device *device)
1611 {
1612         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1613                 drbd_warn(device, "postponing start_resync ...\n");
1614                 device->start_resync_timer.expires = jiffies + HZ/10;
1615                 add_timer(&device->start_resync_timer);
1616                 return;
1617         }
1618
1619         drbd_start_resync(device, C_SYNC_SOURCE);
1620         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1621 }
1622
1623 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1624 {
1625         bool csums_after_crash_only;
1626         rcu_read_lock();
1627         csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1628         rcu_read_unlock();
1629         return connection->agreed_pro_version >= 89 &&          /* supported? */
1630                 connection->csums_tfm &&                        /* configured? */
1631                 (csums_after_crash_only == 0                    /* use for each resync? */
1632                  || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1633 }
1634
1635 /**
1636  * drbd_start_resync() - Start the resync process
1637  * @device:     DRBD device.
1638  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1639  *
1640  * This function might bring you directly into one of the
1641  * C_PAUSED_SYNC_* states.
1642  */
1643 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1644 {
1645         struct drbd_peer_device *peer_device = first_peer_device(device);
1646         struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1647         union drbd_state ns;
1648         int r;
1649
1650         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1651                 drbd_err(device, "Resync already running!\n");
1652                 return;
1653         }
1654
1655         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1656                 if (side == C_SYNC_TARGET) {
1657                         /* Since application IO was locked out during C_WF_BITMAP_T and
1658                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1659                            we check that we might make the data inconsistent. */
1660                         r = drbd_khelper(device, "before-resync-target");
1661                         r = (r >> 8) & 0xff;
1662                         if (r > 0) {
1663                                 drbd_info(device, "before-resync-target handler returned %d, "
1664                                          "dropping connection.\n", r);
1665                                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1666                                 return;
1667                         }
1668                 } else /* C_SYNC_SOURCE */ {
1669                         r = drbd_khelper(device, "before-resync-source");
1670                         r = (r >> 8) & 0xff;
1671                         if (r > 0) {
1672                                 if (r == 3) {
1673                                         drbd_info(device, "before-resync-source handler returned %d, "
1674                                                  "ignoring. Old userland tools?", r);
1675                                 } else {
1676                                         drbd_info(device, "before-resync-source handler returned %d, "
1677                                                  "dropping connection.\n", r);
1678                                         conn_request_state(connection,
1679                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1680                                         return;
1681                                 }
1682                         }
1683                 }
1684         }
1685
1686         if (current == connection->worker.task) {
1687                 /* The worker should not sleep waiting for state_mutex,
1688                    that can take long */
1689                 if (!mutex_trylock(device->state_mutex)) {
1690                         set_bit(B_RS_H_DONE, &device->flags);
1691                         device->start_resync_timer.expires = jiffies + HZ/5;
1692                         add_timer(&device->start_resync_timer);
1693                         return;
1694                 }
1695         } else {
1696                 mutex_lock(device->state_mutex);
1697         }
1698
1699         lock_all_resources();
1700         clear_bit(B_RS_H_DONE, &device->flags);
1701         /* Did some connection breakage or IO error race with us? */
1702         if (device->state.conn < C_CONNECTED
1703         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1704                 unlock_all_resources();
1705                 goto out;
1706         }
1707
1708         ns = drbd_read_state(device);
1709
1710         ns.aftr_isp = !_drbd_may_sync_now(device);
1711
1712         ns.conn = side;
1713
1714         if (side == C_SYNC_TARGET)
1715                 ns.disk = D_INCONSISTENT;
1716         else /* side == C_SYNC_SOURCE */
1717                 ns.pdsk = D_INCONSISTENT;
1718
1719         r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1720         ns = drbd_read_state(device);
1721
1722         if (ns.conn < C_CONNECTED)
1723                 r = SS_UNKNOWN_ERROR;
1724
1725         if (r == SS_SUCCESS) {
1726                 unsigned long tw = drbd_bm_total_weight(device);
1727                 unsigned long now = jiffies;
1728                 int i;
1729
1730                 device->rs_failed    = 0;
1731                 device->rs_paused    = 0;
1732                 device->rs_same_csum = 0;
1733                 device->rs_last_sect_ev = 0;
1734                 device->rs_total     = tw;
1735                 device->rs_start     = now;
1736                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1737                         device->rs_mark_left[i] = tw;
1738                         device->rs_mark_time[i] = now;
1739                 }
1740                 drbd_pause_after(device);
1741                 /* Forget potentially stale cached per resync extent bit-counts.
1742                  * Open coded drbd_rs_cancel_all(device), we already have IRQs
1743                  * disabled, and know the disk state is ok. */
1744                 spin_lock(&device->al_lock);
1745                 lc_reset(device->resync);
1746                 device->resync_locked = 0;
1747                 device->resync_wenr = LC_FREE;
1748                 spin_unlock(&device->al_lock);
1749         }
1750         unlock_all_resources();
1751
1752         if (r == SS_SUCCESS) {
1753                 wake_up(&device->al_wait); /* for lc_reset() above */
1754                 /* reset rs_last_bcast when a resync or verify is started,
1755                  * to deal with potential jiffies wrap. */
1756                 device->rs_last_bcast = jiffies - HZ;
1757
1758                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1759                      drbd_conn_str(ns.conn),
1760                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1761                      (unsigned long) device->rs_total);
1762                 if (side == C_SYNC_TARGET) {
1763                         device->bm_resync_fo = 0;
1764                         device->use_csums = use_checksum_based_resync(connection, device);
1765                 } else {
1766                         device->use_csums = 0;
1767                 }
1768
1769                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1770                  * with w_send_oos, or the sync target will get confused as to
1771                  * how much bits to resync.  We cannot do that always, because for an
1772                  * empty resync and protocol < 95, we need to do it here, as we call
1773                  * drbd_resync_finished from here in that case.
1774                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1775                  * and from after_state_ch otherwise. */
1776                 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1777                         drbd_gen_and_send_sync_uuid(peer_device);
1778
1779                 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1780                         /* This still has a race (about when exactly the peers
1781                          * detect connection loss) that can lead to a full sync
1782                          * on next handshake. In 8.3.9 we fixed this with explicit
1783                          * resync-finished notifications, but the fix
1784                          * introduces a protocol change.  Sleeping for some
1785                          * time longer than the ping interval + timeout on the
1786                          * SyncSource, to give the SyncTarget the chance to
1787                          * detect connection loss, then waiting for a ping
1788                          * response (implicit in drbd_resync_finished) reduces
1789                          * the race considerably, but does not solve it. */
1790                         if (side == C_SYNC_SOURCE) {
1791                                 struct net_conf *nc;
1792                                 int timeo;
1793
1794                                 rcu_read_lock();
1795                                 nc = rcu_dereference(connection->net_conf);
1796                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1797                                 rcu_read_unlock();
1798                                 schedule_timeout_interruptible(timeo);
1799                         }
1800                         drbd_resync_finished(device);
1801                 }
1802
1803                 drbd_rs_controller_reset(device);
1804                 /* ns.conn may already be != device->state.conn,
1805                  * we may have been paused in between, or become paused until
1806                  * the timer triggers.
1807                  * No matter, that is handled in resync_timer_fn() */
1808                 if (ns.conn == C_SYNC_TARGET)
1809                         mod_timer(&device->resync_timer, jiffies);
1810
1811                 drbd_md_sync(device);
1812         }
1813         put_ldev(device);
1814 out:
1815         mutex_unlock(device->state_mutex);
1816 }
1817
1818 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1819 {
1820         struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1821         device->rs_last_bcast = jiffies;
1822
1823         if (!get_ldev(device))
1824                 return;
1825
1826         drbd_bm_write_lazy(device, 0);
1827         if (resync_done && is_sync_state(device->state.conn))
1828                 drbd_resync_finished(device);
1829
1830         drbd_bcast_event(device, &sib);
1831         /* update timestamp, in case it took a while to write out stuff */
1832         device->rs_last_bcast = jiffies;
1833         put_ldev(device);
1834 }
1835
1836 static void drbd_ldev_destroy(struct drbd_device *device)
1837 {
1838         lc_destroy(device->resync);
1839         device->resync = NULL;
1840         lc_destroy(device->act_log);
1841         device->act_log = NULL;
1842
1843         __acquire(local);
1844         drbd_backing_dev_free(device, device->ldev);
1845         device->ldev = NULL;
1846         __release(local);
1847
1848         clear_bit(GOING_DISKLESS, &device->flags);
1849         wake_up(&device->misc_wait);
1850 }
1851
1852 static void go_diskless(struct drbd_device *device)
1853 {
1854         D_ASSERT(device, device->state.disk == D_FAILED);
1855         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1856          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1857          * the protected members anymore, though, so once put_ldev reaches zero
1858          * again, it will be safe to free them. */
1859
1860         /* Try to write changed bitmap pages, read errors may have just
1861          * set some bits outside the area covered by the activity log.
1862          *
1863          * If we have an IO error during the bitmap writeout,
1864          * we will want a full sync next time, just in case.
1865          * (Do we want a specific meta data flag for this?)
1866          *
1867          * If that does not make it to stable storage either,
1868          * we cannot do anything about that anymore.
1869          *
1870          * We still need to check if both bitmap and ldev are present, we may
1871          * end up here after a failed attach, before ldev was even assigned.
1872          */
1873         if (device->bitmap && device->ldev) {
1874                 /* An interrupted resync or similar is allowed to recounts bits
1875                  * while we detach.
1876                  * Any modifications would not be expected anymore, though.
1877                  */
1878                 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1879                                         "detach", BM_LOCKED_TEST_ALLOWED)) {
1880                         if (test_bit(WAS_READ_ERROR, &device->flags)) {
1881                                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1882                                 drbd_md_sync(device);
1883                         }
1884                 }
1885         }
1886
1887         drbd_force_state(device, NS(disk, D_DISKLESS));
1888 }
1889
1890 static int do_md_sync(struct drbd_device *device)
1891 {
1892         drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1893         drbd_md_sync(device);
1894         return 0;
1895 }
1896
1897 /* only called from drbd_worker thread, no locking */
1898 void __update_timing_details(
1899                 struct drbd_thread_timing_details *tdp,
1900                 unsigned int *cb_nr,
1901                 void *cb,
1902                 const char *fn, const unsigned int line)
1903 {
1904         unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1905         struct drbd_thread_timing_details *td = tdp + i;
1906
1907         td->start_jif = jiffies;
1908         td->cb_addr = cb;
1909         td->caller_fn = fn;
1910         td->line = line;
1911         td->cb_nr = *cb_nr;
1912
1913         i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1914         td = tdp + i;
1915         memset(td, 0, sizeof(*td));
1916
1917         ++(*cb_nr);
1918 }
1919
1920 static void do_device_work(struct drbd_device *device, const unsigned long todo)
1921 {
1922         if (test_bit(MD_SYNC, &todo))
1923                 do_md_sync(device);
1924         if (test_bit(RS_DONE, &todo) ||
1925             test_bit(RS_PROGRESS, &todo))
1926                 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1927         if (test_bit(GO_DISKLESS, &todo))
1928                 go_diskless(device);
1929         if (test_bit(DESTROY_DISK, &todo))
1930                 drbd_ldev_destroy(device);
1931         if (test_bit(RS_START, &todo))
1932                 do_start_resync(device);
1933 }
1934
1935 #define DRBD_DEVICE_WORK_MASK   \
1936         ((1UL << GO_DISKLESS)   \
1937         |(1UL << DESTROY_DISK)  \
1938         |(1UL << MD_SYNC)       \
1939         |(1UL << RS_START)      \
1940         |(1UL << RS_PROGRESS)   \
1941         |(1UL << RS_DONE)       \
1942         )
1943
1944 static unsigned long get_work_bits(unsigned long *flags)
1945 {
1946         unsigned long old, new;
1947         do {
1948                 old = *flags;
1949                 new = old & ~DRBD_DEVICE_WORK_MASK;
1950         } while (cmpxchg(flags, old, new) != old);
1951         return old & DRBD_DEVICE_WORK_MASK;
1952 }
1953
1954 static void do_unqueued_work(struct drbd_connection *connection)
1955 {
1956         struct drbd_peer_device *peer_device;
1957         int vnr;
1958
1959         rcu_read_lock();
1960         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1961                 struct drbd_device *device = peer_device->device;
1962                 unsigned long todo = get_work_bits(&device->flags);
1963                 if (!todo)
1964                         continue;
1965
1966                 kref_get(&device->kref);
1967                 rcu_read_unlock();
1968                 do_device_work(device, todo);
1969                 kref_put(&device->kref, drbd_destroy_device);
1970                 rcu_read_lock();
1971         }
1972         rcu_read_unlock();
1973 }
1974
1975 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1976 {
1977         spin_lock_irq(&queue->q_lock);
1978         list_splice_tail_init(&queue->q, work_list);
1979         spin_unlock_irq(&queue->q_lock);
1980         return !list_empty(work_list);
1981 }
1982
1983 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1984 {
1985         DEFINE_WAIT(wait);
1986         struct net_conf *nc;
1987         int uncork, cork;
1988
1989         dequeue_work_batch(&connection->sender_work, work_list);
1990         if (!list_empty(work_list))
1991                 return;
1992
1993         /* Still nothing to do?
1994          * Maybe we still need to close the current epoch,
1995          * even if no new requests are queued yet.
1996          *
1997          * Also, poke TCP, just in case.
1998          * Then wait for new work (or signal). */
1999         rcu_read_lock();
2000         nc = rcu_dereference(connection->net_conf);
2001         uncork = nc ? nc->tcp_cork : 0;
2002         rcu_read_unlock();
2003         if (uncork) {
2004                 mutex_lock(&connection->data.mutex);
2005                 if (connection->data.socket)
2006                         drbd_tcp_uncork(connection->data.socket);
2007                 mutex_unlock(&connection->data.mutex);
2008         }
2009
2010         for (;;) {
2011                 int send_barrier;
2012                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2013                 spin_lock_irq(&connection->resource->req_lock);
2014                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2015                 if (!list_empty(&connection->sender_work.q))
2016                         list_splice_tail_init(&connection->sender_work.q, work_list);
2017                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2018                 if (!list_empty(work_list) || signal_pending(current)) {
2019                         spin_unlock_irq(&connection->resource->req_lock);
2020                         break;
2021                 }
2022
2023                 /* We found nothing new to do, no to-be-communicated request,
2024                  * no other work item.  We may still need to close the last
2025                  * epoch.  Next incoming request epoch will be connection ->
2026                  * current transfer log epoch number.  If that is different
2027                  * from the epoch of the last request we communicated, it is
2028                  * safe to send the epoch separating barrier now.
2029                  */
2030                 send_barrier =
2031                         atomic_read(&connection->current_tle_nr) !=
2032                         connection->send.current_epoch_nr;
2033                 spin_unlock_irq(&connection->resource->req_lock);
2034
2035                 if (send_barrier)
2036                         maybe_send_barrier(connection,
2037                                         connection->send.current_epoch_nr + 1);
2038
2039                 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2040                         break;
2041
2042                 /* drbd_send() may have called flush_signals() */
2043                 if (get_t_state(&connection->worker) != RUNNING)
2044                         break;
2045
2046                 schedule();
2047                 /* may be woken up for other things but new work, too,
2048                  * e.g. if the current epoch got closed.
2049                  * In which case we send the barrier above. */
2050         }
2051         finish_wait(&connection->sender_work.q_wait, &wait);
2052
2053         /* someone may have changed the config while we have been waiting above. */
2054         rcu_read_lock();
2055         nc = rcu_dereference(connection->net_conf);
2056         cork = nc ? nc->tcp_cork : 0;
2057         rcu_read_unlock();
2058         mutex_lock(&connection->data.mutex);
2059         if (connection->data.socket) {
2060                 if (cork)
2061                         drbd_tcp_cork(connection->data.socket);
2062                 else if (!uncork)
2063                         drbd_tcp_uncork(connection->data.socket);
2064         }
2065         mutex_unlock(&connection->data.mutex);
2066 }
2067
2068 int drbd_worker(struct drbd_thread *thi)
2069 {
2070         struct drbd_connection *connection = thi->connection;
2071         struct drbd_work *w = NULL;
2072         struct drbd_peer_device *peer_device;
2073         LIST_HEAD(work_list);
2074         int vnr;
2075
2076         while (get_t_state(thi) == RUNNING) {
2077                 drbd_thread_current_set_cpu(thi);
2078
2079                 if (list_empty(&work_list)) {
2080                         update_worker_timing_details(connection, wait_for_work);
2081                         wait_for_work(connection, &work_list);
2082                 }
2083
2084                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2085                         update_worker_timing_details(connection, do_unqueued_work);
2086                         do_unqueued_work(connection);
2087                 }
2088
2089                 if (signal_pending(current)) {
2090                         flush_signals(current);
2091                         if (get_t_state(thi) == RUNNING) {
2092                                 drbd_warn(connection, "Worker got an unexpected signal\n");
2093                                 continue;
2094                         }
2095                         break;
2096                 }
2097
2098                 if (get_t_state(thi) != RUNNING)
2099                         break;
2100
2101                 if (!list_empty(&work_list)) {
2102                         w = list_first_entry(&work_list, struct drbd_work, list);
2103                         list_del_init(&w->list);
2104                         update_worker_timing_details(connection, w->cb);
2105                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2106                                 continue;
2107                         if (connection->cstate >= C_WF_REPORT_PARAMS)
2108                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2109                 }
2110         }
2111
2112         do {
2113                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2114                         update_worker_timing_details(connection, do_unqueued_work);
2115                         do_unqueued_work(connection);
2116                 }
2117                 if (!list_empty(&work_list)) {
2118                         w = list_first_entry(&work_list, struct drbd_work, list);
2119                         list_del_init(&w->list);
2120                         update_worker_timing_details(connection, w->cb);
2121                         w->cb(w, 1);
2122                 } else
2123                         dequeue_work_batch(&connection->sender_work, &work_list);
2124         } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2125
2126         rcu_read_lock();
2127         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2128                 struct drbd_device *device = peer_device->device;
2129                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2130                 kref_get(&device->kref);
2131                 rcu_read_unlock();
2132                 drbd_device_cleanup(device);
2133                 kref_put(&device->kref, drbd_destroy_device);
2134                 rcu_read_lock();
2135         }
2136         rcu_read_unlock();
2137
2138         return 0;
2139 }