]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_worker.c
Merge branch 'work.memdup_user' of git://git.kernel.org/pub/scm/linux/kernel/git...
[karo-tx-linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched/signal.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44
45 /* endio handlers:
46  *   drbd_md_endio (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   drbd_bm_endio (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57
58 /* used for synchronous meta data and bitmap IO
59  * submitted by drbd_md_sync_page_io()
60  */
61 void drbd_md_endio(struct bio *bio)
62 {
63         struct drbd_device *device;
64
65         device = bio->bi_private;
66         device->md_io.error = blk_status_to_errno(bio->bi_status);
67
68         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
69          * to timeout on the lower level device, and eventually detach from it.
70          * If this io completion runs after that timeout expired, this
71          * drbd_md_put_buffer() may allow us to finally try and re-attach.
72          * During normal operation, this only puts that extra reference
73          * down to 1 again.
74          * Make sure we first drop the reference, and only then signal
75          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
76          * next drbd_md_sync_page_io(), that we trigger the
77          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
78          */
79         drbd_md_put_buffer(device);
80         device->md_io.done = 1;
81         wake_up(&device->misc_wait);
82         bio_put(bio);
83         if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
84                 put_ldev(device);
85 }
86
87 /* reads on behalf of the partner,
88  * "submitted" by the receiver
89  */
90 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
91 {
92         unsigned long flags = 0;
93         struct drbd_peer_device *peer_device = peer_req->peer_device;
94         struct drbd_device *device = peer_device->device;
95
96         spin_lock_irqsave(&device->resource->req_lock, flags);
97         device->read_cnt += peer_req->i.size >> 9;
98         list_del(&peer_req->w.list);
99         if (list_empty(&device->read_ee))
100                 wake_up(&device->ee_wait);
101         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
102                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
103         spin_unlock_irqrestore(&device->resource->req_lock, flags);
104
105         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
106         put_ldev(device);
107 }
108
109 /* writes on behalf of the partner, or resync writes,
110  * "submitted" by the receiver, final stage.  */
111 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
112 {
113         unsigned long flags = 0;
114         struct drbd_peer_device *peer_device = peer_req->peer_device;
115         struct drbd_device *device = peer_device->device;
116         struct drbd_connection *connection = peer_device->connection;
117         struct drbd_interval i;
118         int do_wake;
119         u64 block_id;
120         int do_al_complete_io;
121
122         /* after we moved peer_req to done_ee,
123          * we may no longer access it,
124          * it may be freed/reused already!
125          * (as soon as we release the req_lock) */
126         i = peer_req->i;
127         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
128         block_id = peer_req->block_id;
129         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
130
131         spin_lock_irqsave(&device->resource->req_lock, flags);
132         device->writ_cnt += peer_req->i.size >> 9;
133         list_move_tail(&peer_req->w.list, &device->done_ee);
134
135         /*
136          * Do not remove from the write_requests tree here: we did not send the
137          * Ack yet and did not wake possibly waiting conflicting requests.
138          * Removed from the tree from "drbd_process_done_ee" within the
139          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
140          * _drbd_clear_done_ee.
141          */
142
143         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
144
145         /* FIXME do we want to detach for failed REQ_DISCARD?
146          * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
147         if (peer_req->flags & EE_WAS_ERROR)
148                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
149
150         if (connection->cstate >= C_WF_REPORT_PARAMS) {
151                 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
152                 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
153                         kref_put(&device->kref, drbd_destroy_device);
154         }
155         spin_unlock_irqrestore(&device->resource->req_lock, flags);
156
157         if (block_id == ID_SYNCER)
158                 drbd_rs_complete_io(device, i.sector);
159
160         if (do_wake)
161                 wake_up(&device->ee_wait);
162
163         if (do_al_complete_io)
164                 drbd_al_complete_io(device, &i);
165
166         put_ldev(device);
167 }
168
169 /* writes on behalf of the partner, or resync writes,
170  * "submitted" by the receiver.
171  */
172 void drbd_peer_request_endio(struct bio *bio)
173 {
174         struct drbd_peer_request *peer_req = bio->bi_private;
175         struct drbd_device *device = peer_req->peer_device->device;
176         bool is_write = bio_data_dir(bio) == WRITE;
177         bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
178                           bio_op(bio) == REQ_OP_DISCARD;
179
180         if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
181                 drbd_warn(device, "%s: error=%d s=%llus\n",
182                                 is_write ? (is_discard ? "discard" : "write")
183                                         : "read", bio->bi_status,
184                                 (unsigned long long)peer_req->i.sector);
185
186         if (bio->bi_status)
187                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
188
189         bio_put(bio); /* no need for the bio anymore */
190         if (atomic_dec_and_test(&peer_req->pending_bios)) {
191                 if (is_write)
192                         drbd_endio_write_sec_final(peer_req);
193                 else
194                         drbd_endio_read_sec_final(peer_req);
195         }
196 }
197
198 void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199 {
200         panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
201                 device->minor, device->resource->name, device->vnr);
202 }
203
204 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205  */
206 void drbd_request_endio(struct bio *bio)
207 {
208         unsigned long flags;
209         struct drbd_request *req = bio->bi_private;
210         struct drbd_device *device = req->device;
211         struct bio_and_error m;
212         enum drbd_req_event what;
213
214         /* If this request was aborted locally before,
215          * but now was completed "successfully",
216          * chances are that this caused arbitrary data corruption.
217          *
218          * "aborting" requests, or force-detaching the disk, is intended for
219          * completely blocked/hung local backing devices which do no longer
220          * complete requests at all, not even do error completions.  In this
221          * situation, usually a hard-reset and failover is the only way out.
222          *
223          * By "aborting", basically faking a local error-completion,
224          * we allow for a more graceful swichover by cleanly migrating services.
225          * Still the affected node has to be rebooted "soon".
226          *
227          * By completing these requests, we allow the upper layers to re-use
228          * the associated data pages.
229          *
230          * If later the local backing device "recovers", and now DMAs some data
231          * from disk into the original request pages, in the best case it will
232          * just put random data into unused pages; but typically it will corrupt
233          * meanwhile completely unrelated data, causing all sorts of damage.
234          *
235          * Which means delayed successful completion,
236          * especially for READ requests,
237          * is a reason to panic().
238          *
239          * We assume that a delayed *error* completion is OK,
240          * though we still will complain noisily about it.
241          */
242         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
243                 if (__ratelimit(&drbd_ratelimit_state))
244                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
245
246                 if (!bio->bi_status)
247                         drbd_panic_after_delayed_completion_of_aborted_request(device);
248         }
249
250         /* to avoid recursion in __req_mod */
251         if (unlikely(bio->bi_status)) {
252                 switch (bio_op(bio)) {
253                 case REQ_OP_WRITE_ZEROES:
254                 case REQ_OP_DISCARD:
255                         if (bio->bi_status == BLK_STS_NOTSUPP)
256                                 what = DISCARD_COMPLETED_NOTSUPP;
257                         else
258                                 what = DISCARD_COMPLETED_WITH_ERROR;
259                         break;
260                 case REQ_OP_READ:
261                         if (bio->bi_opf & REQ_RAHEAD)
262                                 what = READ_AHEAD_COMPLETED_WITH_ERROR;
263                         else
264                                 what = READ_COMPLETED_WITH_ERROR;
265                         break;
266                 default:
267                         what = WRITE_COMPLETED_WITH_ERROR;
268                         break;
269                 }
270         } else {
271                 what = COMPLETED_OK;
272         }
273
274         bio_put(req->private_bio);
275         req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
276
277         /* not req_mod(), we need irqsave here! */
278         spin_lock_irqsave(&device->resource->req_lock, flags);
279         __req_mod(req, what, &m);
280         spin_unlock_irqrestore(&device->resource->req_lock, flags);
281         put_ldev(device);
282
283         if (m.bio)
284                 complete_master_bio(device, &m);
285 }
286
287 void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
288 {
289         AHASH_REQUEST_ON_STACK(req, tfm);
290         struct scatterlist sg;
291         struct page *page = peer_req->pages;
292         struct page *tmp;
293         unsigned len;
294
295         ahash_request_set_tfm(req, tfm);
296         ahash_request_set_callback(req, 0, NULL, NULL);
297
298         sg_init_table(&sg, 1);
299         crypto_ahash_init(req);
300
301         while ((tmp = page_chain_next(page))) {
302                 /* all but the last page will be fully used */
303                 sg_set_page(&sg, page, PAGE_SIZE, 0);
304                 ahash_request_set_crypt(req, &sg, NULL, sg.length);
305                 crypto_ahash_update(req);
306                 page = tmp;
307         }
308         /* and now the last, possibly only partially used page */
309         len = peer_req->i.size & (PAGE_SIZE - 1);
310         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
311         ahash_request_set_crypt(req, &sg, digest, sg.length);
312         crypto_ahash_finup(req);
313         ahash_request_zero(req);
314 }
315
316 void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
317 {
318         AHASH_REQUEST_ON_STACK(req, tfm);
319         struct scatterlist sg;
320         struct bio_vec bvec;
321         struct bvec_iter iter;
322
323         ahash_request_set_tfm(req, tfm);
324         ahash_request_set_callback(req, 0, NULL, NULL);
325
326         sg_init_table(&sg, 1);
327         crypto_ahash_init(req);
328
329         bio_for_each_segment(bvec, bio, iter) {
330                 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
331                 ahash_request_set_crypt(req, &sg, NULL, sg.length);
332                 crypto_ahash_update(req);
333                 /* REQ_OP_WRITE_SAME has only one segment,
334                  * checksum the payload only once. */
335                 if (bio_op(bio) == REQ_OP_WRITE_SAME)
336                         break;
337         }
338         ahash_request_set_crypt(req, NULL, digest, 0);
339         crypto_ahash_final(req);
340         ahash_request_zero(req);
341 }
342
343 /* MAYBE merge common code with w_e_end_ov_req */
344 static int w_e_send_csum(struct drbd_work *w, int cancel)
345 {
346         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
347         struct drbd_peer_device *peer_device = peer_req->peer_device;
348         struct drbd_device *device = peer_device->device;
349         int digest_size;
350         void *digest;
351         int err = 0;
352
353         if (unlikely(cancel))
354                 goto out;
355
356         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
357                 goto out;
358
359         digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
360         digest = kmalloc(digest_size, GFP_NOIO);
361         if (digest) {
362                 sector_t sector = peer_req->i.sector;
363                 unsigned int size = peer_req->i.size;
364                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
365                 /* Free peer_req and pages before send.
366                  * In case we block on congestion, we could otherwise run into
367                  * some distributed deadlock, if the other side blocks on
368                  * congestion as well, because our receiver blocks in
369                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
370                 drbd_free_peer_req(device, peer_req);
371                 peer_req = NULL;
372                 inc_rs_pending(device);
373                 err = drbd_send_drequest_csum(peer_device, sector, size,
374                                               digest, digest_size,
375                                               P_CSUM_RS_REQUEST);
376                 kfree(digest);
377         } else {
378                 drbd_err(device, "kmalloc() of digest failed.\n");
379                 err = -ENOMEM;
380         }
381
382 out:
383         if (peer_req)
384                 drbd_free_peer_req(device, peer_req);
385
386         if (unlikely(err))
387                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
388         return err;
389 }
390
391 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
392
393 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
394 {
395         struct drbd_device *device = peer_device->device;
396         struct drbd_peer_request *peer_req;
397
398         if (!get_ldev(device))
399                 return -EIO;
400
401         /* GFP_TRY, because if there is no memory available right now, this may
402          * be rescheduled for later. It is "only" background resync, after all. */
403         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
404                                        size, size, GFP_TRY);
405         if (!peer_req)
406                 goto defer;
407
408         peer_req->w.cb = w_e_send_csum;
409         spin_lock_irq(&device->resource->req_lock);
410         list_add_tail(&peer_req->w.list, &device->read_ee);
411         spin_unlock_irq(&device->resource->req_lock);
412
413         atomic_add(size >> 9, &device->rs_sect_ev);
414         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
415                                      DRBD_FAULT_RS_RD) == 0)
416                 return 0;
417
418         /* If it failed because of ENOMEM, retry should help.  If it failed
419          * because bio_add_page failed (probably broken lower level driver),
420          * retry may or may not help.
421          * If it does not, you may need to force disconnect. */
422         spin_lock_irq(&device->resource->req_lock);
423         list_del(&peer_req->w.list);
424         spin_unlock_irq(&device->resource->req_lock);
425
426         drbd_free_peer_req(device, peer_req);
427 defer:
428         put_ldev(device);
429         return -EAGAIN;
430 }
431
432 int w_resync_timer(struct drbd_work *w, int cancel)
433 {
434         struct drbd_device *device =
435                 container_of(w, struct drbd_device, resync_work);
436
437         switch (device->state.conn) {
438         case C_VERIFY_S:
439                 make_ov_request(device, cancel);
440                 break;
441         case C_SYNC_TARGET:
442                 make_resync_request(device, cancel);
443                 break;
444         }
445
446         return 0;
447 }
448
449 void resync_timer_fn(unsigned long data)
450 {
451         struct drbd_device *device = (struct drbd_device *) data;
452
453         drbd_queue_work_if_unqueued(
454                 &first_peer_device(device)->connection->sender_work,
455                 &device->resync_work);
456 }
457
458 static void fifo_set(struct fifo_buffer *fb, int value)
459 {
460         int i;
461
462         for (i = 0; i < fb->size; i++)
463                 fb->values[i] = value;
464 }
465
466 static int fifo_push(struct fifo_buffer *fb, int value)
467 {
468         int ov;
469
470         ov = fb->values[fb->head_index];
471         fb->values[fb->head_index++] = value;
472
473         if (fb->head_index >= fb->size)
474                 fb->head_index = 0;
475
476         return ov;
477 }
478
479 static void fifo_add_val(struct fifo_buffer *fb, int value)
480 {
481         int i;
482
483         for (i = 0; i < fb->size; i++)
484                 fb->values[i] += value;
485 }
486
487 struct fifo_buffer *fifo_alloc(int fifo_size)
488 {
489         struct fifo_buffer *fb;
490
491         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
492         if (!fb)
493                 return NULL;
494
495         fb->head_index = 0;
496         fb->size = fifo_size;
497         fb->total = 0;
498
499         return fb;
500 }
501
502 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
503 {
504         struct disk_conf *dc;
505         unsigned int want;     /* The number of sectors we want in-flight */
506         int req_sect; /* Number of sectors to request in this turn */
507         int correction; /* Number of sectors more we need in-flight */
508         int cps; /* correction per invocation of drbd_rs_controller() */
509         int steps; /* Number of time steps to plan ahead */
510         int curr_corr;
511         int max_sect;
512         struct fifo_buffer *plan;
513
514         dc = rcu_dereference(device->ldev->disk_conf);
515         plan = rcu_dereference(device->rs_plan_s);
516
517         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
518
519         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
520                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
521         } else { /* normal path */
522                 want = dc->c_fill_target ? dc->c_fill_target :
523                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
524         }
525
526         correction = want - device->rs_in_flight - plan->total;
527
528         /* Plan ahead */
529         cps = correction / steps;
530         fifo_add_val(plan, cps);
531         plan->total += cps * steps;
532
533         /* What we do in this step */
534         curr_corr = fifo_push(plan, 0);
535         plan->total -= curr_corr;
536
537         req_sect = sect_in + curr_corr;
538         if (req_sect < 0)
539                 req_sect = 0;
540
541         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
542         if (req_sect > max_sect)
543                 req_sect = max_sect;
544
545         /*
546         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
547                  sect_in, device->rs_in_flight, want, correction,
548                  steps, cps, device->rs_planed, curr_corr, req_sect);
549         */
550
551         return req_sect;
552 }
553
554 static int drbd_rs_number_requests(struct drbd_device *device)
555 {
556         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
557         int number, mxb;
558
559         sect_in = atomic_xchg(&device->rs_sect_in, 0);
560         device->rs_in_flight -= sect_in;
561
562         rcu_read_lock();
563         mxb = drbd_get_max_buffers(device) / 2;
564         if (rcu_dereference(device->rs_plan_s)->size) {
565                 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
566                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
567         } else {
568                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
569                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
570         }
571         rcu_read_unlock();
572
573         /* Don't have more than "max-buffers"/2 in-flight.
574          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
575          * potentially causing a distributed deadlock on congestion during
576          * online-verify or (checksum-based) resync, if max-buffers,
577          * socket buffer sizes and resync rate settings are mis-configured. */
578
579         /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
580          * mxb (as used here, and in drbd_alloc_pages on the peer) is
581          * "number of pages" (typically also 4k),
582          * but "rs_in_flight" is in "sectors" (512 Byte). */
583         if (mxb - device->rs_in_flight/8 < number)
584                 number = mxb - device->rs_in_flight/8;
585
586         return number;
587 }
588
589 static int make_resync_request(struct drbd_device *const device, int cancel)
590 {
591         struct drbd_peer_device *const peer_device = first_peer_device(device);
592         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
593         unsigned long bit;
594         sector_t sector;
595         const sector_t capacity = drbd_get_capacity(device->this_bdev);
596         int max_bio_size;
597         int number, rollback_i, size;
598         int align, requeue = 0;
599         int i = 0;
600         int discard_granularity = 0;
601
602         if (unlikely(cancel))
603                 return 0;
604
605         if (device->rs_total == 0) {
606                 /* empty resync? */
607                 drbd_resync_finished(device);
608                 return 0;
609         }
610
611         if (!get_ldev(device)) {
612                 /* Since we only need to access device->rsync a
613                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
614                    to continue resync with a broken disk makes no sense at
615                    all */
616                 drbd_err(device, "Disk broke down during resync!\n");
617                 return 0;
618         }
619
620         if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
621                 rcu_read_lock();
622                 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
623                 rcu_read_unlock();
624         }
625
626         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
627         number = drbd_rs_number_requests(device);
628         if (number <= 0)
629                 goto requeue;
630
631         for (i = 0; i < number; i++) {
632                 /* Stop generating RS requests when half of the send buffer is filled,
633                  * but notify TCP that we'd like to have more space. */
634                 mutex_lock(&connection->data.mutex);
635                 if (connection->data.socket) {
636                         struct sock *sk = connection->data.socket->sk;
637                         int queued = sk->sk_wmem_queued;
638                         int sndbuf = sk->sk_sndbuf;
639                         if (queued > sndbuf / 2) {
640                                 requeue = 1;
641                                 if (sk->sk_socket)
642                                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
643                         }
644                 } else
645                         requeue = 1;
646                 mutex_unlock(&connection->data.mutex);
647                 if (requeue)
648                         goto requeue;
649
650 next_sector:
651                 size = BM_BLOCK_SIZE;
652                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
653
654                 if (bit == DRBD_END_OF_BITMAP) {
655                         device->bm_resync_fo = drbd_bm_bits(device);
656                         put_ldev(device);
657                         return 0;
658                 }
659
660                 sector = BM_BIT_TO_SECT(bit);
661
662                 if (drbd_try_rs_begin_io(device, sector)) {
663                         device->bm_resync_fo = bit;
664                         goto requeue;
665                 }
666                 device->bm_resync_fo = bit + 1;
667
668                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
669                         drbd_rs_complete_io(device, sector);
670                         goto next_sector;
671                 }
672
673 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
674                 /* try to find some adjacent bits.
675                  * we stop if we have already the maximum req size.
676                  *
677                  * Additionally always align bigger requests, in order to
678                  * be prepared for all stripe sizes of software RAIDs.
679                  */
680                 align = 1;
681                 rollback_i = i;
682                 while (i < number) {
683                         if (size + BM_BLOCK_SIZE > max_bio_size)
684                                 break;
685
686                         /* Be always aligned */
687                         if (sector & ((1<<(align+3))-1))
688                                 break;
689
690                         if (discard_granularity && size == discard_granularity)
691                                 break;
692
693                         /* do not cross extent boundaries */
694                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
695                                 break;
696                         /* now, is it actually dirty, after all?
697                          * caution, drbd_bm_test_bit is tri-state for some
698                          * obscure reason; ( b == 0 ) would get the out-of-band
699                          * only accidentally right because of the "oddly sized"
700                          * adjustment below */
701                         if (drbd_bm_test_bit(device, bit+1) != 1)
702                                 break;
703                         bit++;
704                         size += BM_BLOCK_SIZE;
705                         if ((BM_BLOCK_SIZE << align) <= size)
706                                 align++;
707                         i++;
708                 }
709                 /* if we merged some,
710                  * reset the offset to start the next drbd_bm_find_next from */
711                 if (size > BM_BLOCK_SIZE)
712                         device->bm_resync_fo = bit + 1;
713 #endif
714
715                 /* adjust very last sectors, in case we are oddly sized */
716                 if (sector + (size>>9) > capacity)
717                         size = (capacity-sector)<<9;
718
719                 if (device->use_csums) {
720                         switch (read_for_csum(peer_device, sector, size)) {
721                         case -EIO: /* Disk failure */
722                                 put_ldev(device);
723                                 return -EIO;
724                         case -EAGAIN: /* allocation failed, or ldev busy */
725                                 drbd_rs_complete_io(device, sector);
726                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
727                                 i = rollback_i;
728                                 goto requeue;
729                         case 0:
730                                 /* everything ok */
731                                 break;
732                         default:
733                                 BUG();
734                         }
735                 } else {
736                         int err;
737
738                         inc_rs_pending(device);
739                         err = drbd_send_drequest(peer_device,
740                                                  size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
741                                                  sector, size, ID_SYNCER);
742                         if (err) {
743                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
744                                 dec_rs_pending(device);
745                                 put_ldev(device);
746                                 return err;
747                         }
748                 }
749         }
750
751         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
752                 /* last syncer _request_ was sent,
753                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
754                  * next sync group will resume), as soon as we receive the last
755                  * resync data block, and the last bit is cleared.
756                  * until then resync "work" is "inactive" ...
757                  */
758                 put_ldev(device);
759                 return 0;
760         }
761
762  requeue:
763         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
764         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
765         put_ldev(device);
766         return 0;
767 }
768
769 static int make_ov_request(struct drbd_device *device, int cancel)
770 {
771         int number, i, size;
772         sector_t sector;
773         const sector_t capacity = drbd_get_capacity(device->this_bdev);
774         bool stop_sector_reached = false;
775
776         if (unlikely(cancel))
777                 return 1;
778
779         number = drbd_rs_number_requests(device);
780
781         sector = device->ov_position;
782         for (i = 0; i < number; i++) {
783                 if (sector >= capacity)
784                         return 1;
785
786                 /* We check for "finished" only in the reply path:
787                  * w_e_end_ov_reply().
788                  * We need to send at least one request out. */
789                 stop_sector_reached = i > 0
790                         && verify_can_do_stop_sector(device)
791                         && sector >= device->ov_stop_sector;
792                 if (stop_sector_reached)
793                         break;
794
795                 size = BM_BLOCK_SIZE;
796
797                 if (drbd_try_rs_begin_io(device, sector)) {
798                         device->ov_position = sector;
799                         goto requeue;
800                 }
801
802                 if (sector + (size>>9) > capacity)
803                         size = (capacity-sector)<<9;
804
805                 inc_rs_pending(device);
806                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
807                         dec_rs_pending(device);
808                         return 0;
809                 }
810                 sector += BM_SECT_PER_BIT;
811         }
812         device->ov_position = sector;
813
814  requeue:
815         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
816         if (i == 0 || !stop_sector_reached)
817                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
818         return 1;
819 }
820
821 int w_ov_finished(struct drbd_work *w, int cancel)
822 {
823         struct drbd_device_work *dw =
824                 container_of(w, struct drbd_device_work, w);
825         struct drbd_device *device = dw->device;
826         kfree(dw);
827         ov_out_of_sync_print(device);
828         drbd_resync_finished(device);
829
830         return 0;
831 }
832
833 static int w_resync_finished(struct drbd_work *w, int cancel)
834 {
835         struct drbd_device_work *dw =
836                 container_of(w, struct drbd_device_work, w);
837         struct drbd_device *device = dw->device;
838         kfree(dw);
839
840         drbd_resync_finished(device);
841
842         return 0;
843 }
844
845 static void ping_peer(struct drbd_device *device)
846 {
847         struct drbd_connection *connection = first_peer_device(device)->connection;
848
849         clear_bit(GOT_PING_ACK, &connection->flags);
850         request_ping(connection);
851         wait_event(connection->ping_wait,
852                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
853 }
854
855 int drbd_resync_finished(struct drbd_device *device)
856 {
857         struct drbd_connection *connection = first_peer_device(device)->connection;
858         unsigned long db, dt, dbdt;
859         unsigned long n_oos;
860         union drbd_state os, ns;
861         struct drbd_device_work *dw;
862         char *khelper_cmd = NULL;
863         int verify_done = 0;
864
865         /* Remove all elements from the resync LRU. Since future actions
866          * might set bits in the (main) bitmap, then the entries in the
867          * resync LRU would be wrong. */
868         if (drbd_rs_del_all(device)) {
869                 /* In case this is not possible now, most probably because
870                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
871                  * queue (or even the read operations for those packets
872                  * is not finished by now).   Retry in 100ms. */
873
874                 schedule_timeout_interruptible(HZ / 10);
875                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
876                 if (dw) {
877                         dw->w.cb = w_resync_finished;
878                         dw->device = device;
879                         drbd_queue_work(&connection->sender_work, &dw->w);
880                         return 1;
881                 }
882                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
883         }
884
885         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
886         if (dt <= 0)
887                 dt = 1;
888
889         db = device->rs_total;
890         /* adjust for verify start and stop sectors, respective reached position */
891         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
892                 db -= device->ov_left;
893
894         dbdt = Bit2KB(db/dt);
895         device->rs_paused /= HZ;
896
897         if (!get_ldev(device))
898                 goto out;
899
900         ping_peer(device);
901
902         spin_lock_irq(&device->resource->req_lock);
903         os = drbd_read_state(device);
904
905         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
906
907         /* This protects us against multiple calls (that can happen in the presence
908            of application IO), and against connectivity loss just before we arrive here. */
909         if (os.conn <= C_CONNECTED)
910                 goto out_unlock;
911
912         ns = os;
913         ns.conn = C_CONNECTED;
914
915         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
916              verify_done ? "Online verify" : "Resync",
917              dt + device->rs_paused, device->rs_paused, dbdt);
918
919         n_oos = drbd_bm_total_weight(device);
920
921         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
922                 if (n_oos) {
923                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
924                               n_oos, Bit2KB(1));
925                         khelper_cmd = "out-of-sync";
926                 }
927         } else {
928                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
929
930                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
931                         khelper_cmd = "after-resync-target";
932
933                 if (device->use_csums && device->rs_total) {
934                         const unsigned long s = device->rs_same_csum;
935                         const unsigned long t = device->rs_total;
936                         const int ratio =
937                                 (t == 0)     ? 0 :
938                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
939                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
940                              "transferred %luK total %luK\n",
941                              ratio,
942                              Bit2KB(device->rs_same_csum),
943                              Bit2KB(device->rs_total - device->rs_same_csum),
944                              Bit2KB(device->rs_total));
945                 }
946         }
947
948         if (device->rs_failed) {
949                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
950
951                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
952                         ns.disk = D_INCONSISTENT;
953                         ns.pdsk = D_UP_TO_DATE;
954                 } else {
955                         ns.disk = D_UP_TO_DATE;
956                         ns.pdsk = D_INCONSISTENT;
957                 }
958         } else {
959                 ns.disk = D_UP_TO_DATE;
960                 ns.pdsk = D_UP_TO_DATE;
961
962                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
963                         if (device->p_uuid) {
964                                 int i;
965                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
966                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
967                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
968                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
969                         } else {
970                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
971                         }
972                 }
973
974                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
975                         /* for verify runs, we don't update uuids here,
976                          * so there would be nothing to report. */
977                         drbd_uuid_set_bm(device, 0UL);
978                         drbd_print_uuids(device, "updated UUIDs");
979                         if (device->p_uuid) {
980                                 /* Now the two UUID sets are equal, update what we
981                                  * know of the peer. */
982                                 int i;
983                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
984                                         device->p_uuid[i] = device->ldev->md.uuid[i];
985                         }
986                 }
987         }
988
989         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
990 out_unlock:
991         spin_unlock_irq(&device->resource->req_lock);
992
993         /* If we have been sync source, and have an effective fencing-policy,
994          * once *all* volumes are back in sync, call "unfence". */
995         if (os.conn == C_SYNC_SOURCE) {
996                 enum drbd_disk_state disk_state = D_MASK;
997                 enum drbd_disk_state pdsk_state = D_MASK;
998                 enum drbd_fencing_p fp = FP_DONT_CARE;
999
1000                 rcu_read_lock();
1001                 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1002                 if (fp != FP_DONT_CARE) {
1003                         struct drbd_peer_device *peer_device;
1004                         int vnr;
1005                         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1006                                 struct drbd_device *device = peer_device->device;
1007                                 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1008                                 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1009                         }
1010                 }
1011                 rcu_read_unlock();
1012                 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1013                         conn_khelper(connection, "unfence-peer");
1014         }
1015
1016         put_ldev(device);
1017 out:
1018         device->rs_total  = 0;
1019         device->rs_failed = 0;
1020         device->rs_paused = 0;
1021
1022         /* reset start sector, if we reached end of device */
1023         if (verify_done && device->ov_left == 0)
1024                 device->ov_start_sector = 0;
1025
1026         drbd_md_sync(device);
1027
1028         if (khelper_cmd)
1029                 drbd_khelper(device, khelper_cmd);
1030
1031         return 1;
1032 }
1033
1034 /* helper */
1035 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1036 {
1037         if (drbd_peer_req_has_active_page(peer_req)) {
1038                 /* This might happen if sendpage() has not finished */
1039                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1040                 atomic_add(i, &device->pp_in_use_by_net);
1041                 atomic_sub(i, &device->pp_in_use);
1042                 spin_lock_irq(&device->resource->req_lock);
1043                 list_add_tail(&peer_req->w.list, &device->net_ee);
1044                 spin_unlock_irq(&device->resource->req_lock);
1045                 wake_up(&drbd_pp_wait);
1046         } else
1047                 drbd_free_peer_req(device, peer_req);
1048 }
1049
1050 /**
1051  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1052  * @w:          work object.
1053  * @cancel:     The connection will be closed anyways
1054  */
1055 int w_e_end_data_req(struct drbd_work *w, int cancel)
1056 {
1057         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1058         struct drbd_peer_device *peer_device = peer_req->peer_device;
1059         struct drbd_device *device = peer_device->device;
1060         int err;
1061
1062         if (unlikely(cancel)) {
1063                 drbd_free_peer_req(device, peer_req);
1064                 dec_unacked(device);
1065                 return 0;
1066         }
1067
1068         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1069                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1070         } else {
1071                 if (__ratelimit(&drbd_ratelimit_state))
1072                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1073                             (unsigned long long)peer_req->i.sector);
1074
1075                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1076         }
1077
1078         dec_unacked(device);
1079
1080         move_to_net_ee_or_free(device, peer_req);
1081
1082         if (unlikely(err))
1083                 drbd_err(device, "drbd_send_block() failed\n");
1084         return err;
1085 }
1086
1087 static bool all_zero(struct drbd_peer_request *peer_req)
1088 {
1089         struct page *page = peer_req->pages;
1090         unsigned int len = peer_req->i.size;
1091
1092         page_chain_for_each(page) {
1093                 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1094                 unsigned int i, words = l / sizeof(long);
1095                 unsigned long *d;
1096
1097                 d = kmap_atomic(page);
1098                 for (i = 0; i < words; i++) {
1099                         if (d[i]) {
1100                                 kunmap_atomic(d);
1101                                 return false;
1102                         }
1103                 }
1104                 kunmap_atomic(d);
1105                 len -= l;
1106         }
1107
1108         return true;
1109 }
1110
1111 /**
1112  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1113  * @w:          work object.
1114  * @cancel:     The connection will be closed anyways
1115  */
1116 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1117 {
1118         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1119         struct drbd_peer_device *peer_device = peer_req->peer_device;
1120         struct drbd_device *device = peer_device->device;
1121         int err;
1122
1123         if (unlikely(cancel)) {
1124                 drbd_free_peer_req(device, peer_req);
1125                 dec_unacked(device);
1126                 return 0;
1127         }
1128
1129         if (get_ldev_if_state(device, D_FAILED)) {
1130                 drbd_rs_complete_io(device, peer_req->i.sector);
1131                 put_ldev(device);
1132         }
1133
1134         if (device->state.conn == C_AHEAD) {
1135                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1136         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1137                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1138                         inc_rs_pending(device);
1139                         if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1140                                 err = drbd_send_rs_deallocated(peer_device, peer_req);
1141                         else
1142                                 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1143                 } else {
1144                         if (__ratelimit(&drbd_ratelimit_state))
1145                                 drbd_err(device, "Not sending RSDataReply, "
1146                                     "partner DISKLESS!\n");
1147                         err = 0;
1148                 }
1149         } else {
1150                 if (__ratelimit(&drbd_ratelimit_state))
1151                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1152                             (unsigned long long)peer_req->i.sector);
1153
1154                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1155
1156                 /* update resync data with failure */
1157                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1158         }
1159
1160         dec_unacked(device);
1161
1162         move_to_net_ee_or_free(device, peer_req);
1163
1164         if (unlikely(err))
1165                 drbd_err(device, "drbd_send_block() failed\n");
1166         return err;
1167 }
1168
1169 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1170 {
1171         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1172         struct drbd_peer_device *peer_device = peer_req->peer_device;
1173         struct drbd_device *device = peer_device->device;
1174         struct digest_info *di;
1175         int digest_size;
1176         void *digest = NULL;
1177         int err, eq = 0;
1178
1179         if (unlikely(cancel)) {
1180                 drbd_free_peer_req(device, peer_req);
1181                 dec_unacked(device);
1182                 return 0;
1183         }
1184
1185         if (get_ldev(device)) {
1186                 drbd_rs_complete_io(device, peer_req->i.sector);
1187                 put_ldev(device);
1188         }
1189
1190         di = peer_req->digest;
1191
1192         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1193                 /* quick hack to try to avoid a race against reconfiguration.
1194                  * a real fix would be much more involved,
1195                  * introducing more locking mechanisms */
1196                 if (peer_device->connection->csums_tfm) {
1197                         digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1198                         D_ASSERT(device, digest_size == di->digest_size);
1199                         digest = kmalloc(digest_size, GFP_NOIO);
1200                 }
1201                 if (digest) {
1202                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1203                         eq = !memcmp(digest, di->digest, digest_size);
1204                         kfree(digest);
1205                 }
1206
1207                 if (eq) {
1208                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1209                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1210                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1211                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1212                 } else {
1213                         inc_rs_pending(device);
1214                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1215                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1216                         kfree(di);
1217                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1218                 }
1219         } else {
1220                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1221                 if (__ratelimit(&drbd_ratelimit_state))
1222                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1223         }
1224
1225         dec_unacked(device);
1226         move_to_net_ee_or_free(device, peer_req);
1227
1228         if (unlikely(err))
1229                 drbd_err(device, "drbd_send_block/ack() failed\n");
1230         return err;
1231 }
1232
1233 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1234 {
1235         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1236         struct drbd_peer_device *peer_device = peer_req->peer_device;
1237         struct drbd_device *device = peer_device->device;
1238         sector_t sector = peer_req->i.sector;
1239         unsigned int size = peer_req->i.size;
1240         int digest_size;
1241         void *digest;
1242         int err = 0;
1243
1244         if (unlikely(cancel))
1245                 goto out;
1246
1247         digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1248         digest = kmalloc(digest_size, GFP_NOIO);
1249         if (!digest) {
1250                 err = 1;        /* terminate the connection in case the allocation failed */
1251                 goto out;
1252         }
1253
1254         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1255                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1256         else
1257                 memset(digest, 0, digest_size);
1258
1259         /* Free e and pages before send.
1260          * In case we block on congestion, we could otherwise run into
1261          * some distributed deadlock, if the other side blocks on
1262          * congestion as well, because our receiver blocks in
1263          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1264         drbd_free_peer_req(device, peer_req);
1265         peer_req = NULL;
1266         inc_rs_pending(device);
1267         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1268         if (err)
1269                 dec_rs_pending(device);
1270         kfree(digest);
1271
1272 out:
1273         if (peer_req)
1274                 drbd_free_peer_req(device, peer_req);
1275         dec_unacked(device);
1276         return err;
1277 }
1278
1279 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1280 {
1281         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1282                 device->ov_last_oos_size += size>>9;
1283         } else {
1284                 device->ov_last_oos_start = sector;
1285                 device->ov_last_oos_size = size>>9;
1286         }
1287         drbd_set_out_of_sync(device, sector, size);
1288 }
1289
1290 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1291 {
1292         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1293         struct drbd_peer_device *peer_device = peer_req->peer_device;
1294         struct drbd_device *device = peer_device->device;
1295         struct digest_info *di;
1296         void *digest;
1297         sector_t sector = peer_req->i.sector;
1298         unsigned int size = peer_req->i.size;
1299         int digest_size;
1300         int err, eq = 0;
1301         bool stop_sector_reached = false;
1302
1303         if (unlikely(cancel)) {
1304                 drbd_free_peer_req(device, peer_req);
1305                 dec_unacked(device);
1306                 return 0;
1307         }
1308
1309         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1310          * the resync lru has been cleaned up already */
1311         if (get_ldev(device)) {
1312                 drbd_rs_complete_io(device, peer_req->i.sector);
1313                 put_ldev(device);
1314         }
1315
1316         di = peer_req->digest;
1317
1318         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1319                 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1320                 digest = kmalloc(digest_size, GFP_NOIO);
1321                 if (digest) {
1322                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1323
1324                         D_ASSERT(device, digest_size == di->digest_size);
1325                         eq = !memcmp(digest, di->digest, digest_size);
1326                         kfree(digest);
1327                 }
1328         }
1329
1330         /* Free peer_req and pages before send.
1331          * In case we block on congestion, we could otherwise run into
1332          * some distributed deadlock, if the other side blocks on
1333          * congestion as well, because our receiver blocks in
1334          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1335         drbd_free_peer_req(device, peer_req);
1336         if (!eq)
1337                 drbd_ov_out_of_sync_found(device, sector, size);
1338         else
1339                 ov_out_of_sync_print(device);
1340
1341         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1342                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1343
1344         dec_unacked(device);
1345
1346         --device->ov_left;
1347
1348         /* let's advance progress step marks only for every other megabyte */
1349         if ((device->ov_left & 0x200) == 0x200)
1350                 drbd_advance_rs_marks(device, device->ov_left);
1351
1352         stop_sector_reached = verify_can_do_stop_sector(device) &&
1353                 (sector + (size>>9)) >= device->ov_stop_sector;
1354
1355         if (device->ov_left == 0 || stop_sector_reached) {
1356                 ov_out_of_sync_print(device);
1357                 drbd_resync_finished(device);
1358         }
1359
1360         return err;
1361 }
1362
1363 /* FIXME
1364  * We need to track the number of pending barrier acks,
1365  * and to be able to wait for them.
1366  * See also comment in drbd_adm_attach before drbd_suspend_io.
1367  */
1368 static int drbd_send_barrier(struct drbd_connection *connection)
1369 {
1370         struct p_barrier *p;
1371         struct drbd_socket *sock;
1372
1373         sock = &connection->data;
1374         p = conn_prepare_command(connection, sock);
1375         if (!p)
1376                 return -EIO;
1377         p->barrier = connection->send.current_epoch_nr;
1378         p->pad = 0;
1379         connection->send.current_epoch_writes = 0;
1380         connection->send.last_sent_barrier_jif = jiffies;
1381
1382         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1383 }
1384
1385 int w_send_write_hint(struct drbd_work *w, int cancel)
1386 {
1387         struct drbd_device *device =
1388                 container_of(w, struct drbd_device, unplug_work);
1389         struct drbd_socket *sock;
1390
1391         if (cancel)
1392                 return 0;
1393         sock = &first_peer_device(device)->connection->data;
1394         if (!drbd_prepare_command(first_peer_device(device), sock))
1395                 return -EIO;
1396         return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1397 }
1398
1399 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1400 {
1401         if (!connection->send.seen_any_write_yet) {
1402                 connection->send.seen_any_write_yet = true;
1403                 connection->send.current_epoch_nr = epoch;
1404                 connection->send.current_epoch_writes = 0;
1405                 connection->send.last_sent_barrier_jif = jiffies;
1406         }
1407 }
1408
1409 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1410 {
1411         /* re-init if first write on this connection */
1412         if (!connection->send.seen_any_write_yet)
1413                 return;
1414         if (connection->send.current_epoch_nr != epoch) {
1415                 if (connection->send.current_epoch_writes)
1416                         drbd_send_barrier(connection);
1417                 connection->send.current_epoch_nr = epoch;
1418         }
1419 }
1420
1421 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1422 {
1423         struct drbd_request *req = container_of(w, struct drbd_request, w);
1424         struct drbd_device *device = req->device;
1425         struct drbd_peer_device *const peer_device = first_peer_device(device);
1426         struct drbd_connection *const connection = peer_device->connection;
1427         int err;
1428
1429         if (unlikely(cancel)) {
1430                 req_mod(req, SEND_CANCELED);
1431                 return 0;
1432         }
1433         req->pre_send_jif = jiffies;
1434
1435         /* this time, no connection->send.current_epoch_writes++;
1436          * If it was sent, it was the closing barrier for the last
1437          * replicated epoch, before we went into AHEAD mode.
1438          * No more barriers will be sent, until we leave AHEAD mode again. */
1439         maybe_send_barrier(connection, req->epoch);
1440
1441         err = drbd_send_out_of_sync(peer_device, req);
1442         req_mod(req, OOS_HANDED_TO_NETWORK);
1443
1444         return err;
1445 }
1446
1447 /**
1448  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1449  * @w:          work object.
1450  * @cancel:     The connection will be closed anyways
1451  */
1452 int w_send_dblock(struct drbd_work *w, int cancel)
1453 {
1454         struct drbd_request *req = container_of(w, struct drbd_request, w);
1455         struct drbd_device *device = req->device;
1456         struct drbd_peer_device *const peer_device = first_peer_device(device);
1457         struct drbd_connection *connection = peer_device->connection;
1458         int err;
1459
1460         if (unlikely(cancel)) {
1461                 req_mod(req, SEND_CANCELED);
1462                 return 0;
1463         }
1464         req->pre_send_jif = jiffies;
1465
1466         re_init_if_first_write(connection, req->epoch);
1467         maybe_send_barrier(connection, req->epoch);
1468         connection->send.current_epoch_writes++;
1469
1470         err = drbd_send_dblock(peer_device, req);
1471         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1472
1473         return err;
1474 }
1475
1476 /**
1477  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1478  * @w:          work object.
1479  * @cancel:     The connection will be closed anyways
1480  */
1481 int w_send_read_req(struct drbd_work *w, int cancel)
1482 {
1483         struct drbd_request *req = container_of(w, struct drbd_request, w);
1484         struct drbd_device *device = req->device;
1485         struct drbd_peer_device *const peer_device = first_peer_device(device);
1486         struct drbd_connection *connection = peer_device->connection;
1487         int err;
1488
1489         if (unlikely(cancel)) {
1490                 req_mod(req, SEND_CANCELED);
1491                 return 0;
1492         }
1493         req->pre_send_jif = jiffies;
1494
1495         /* Even read requests may close a write epoch,
1496          * if there was any yet. */
1497         maybe_send_barrier(connection, req->epoch);
1498
1499         err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1500                                  (unsigned long)req);
1501
1502         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1503
1504         return err;
1505 }
1506
1507 int w_restart_disk_io(struct drbd_work *w, int cancel)
1508 {
1509         struct drbd_request *req = container_of(w, struct drbd_request, w);
1510         struct drbd_device *device = req->device;
1511
1512         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1513                 drbd_al_begin_io(device, &req->i);
1514
1515         drbd_req_make_private_bio(req, req->master_bio);
1516         req->private_bio->bi_bdev = device->ldev->backing_bdev;
1517         generic_make_request(req->private_bio);
1518
1519         return 0;
1520 }
1521
1522 static int _drbd_may_sync_now(struct drbd_device *device)
1523 {
1524         struct drbd_device *odev = device;
1525         int resync_after;
1526
1527         while (1) {
1528                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1529                         return 1;
1530                 rcu_read_lock();
1531                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1532                 rcu_read_unlock();
1533                 if (resync_after == -1)
1534                         return 1;
1535                 odev = minor_to_device(resync_after);
1536                 if (!odev)
1537                         return 1;
1538                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1539                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1540                     odev->state.aftr_isp || odev->state.peer_isp ||
1541                     odev->state.user_isp)
1542                         return 0;
1543         }
1544 }
1545
1546 /**
1547  * drbd_pause_after() - Pause resync on all devices that may not resync now
1548  * @device:     DRBD device.
1549  *
1550  * Called from process context only (admin command and after_state_ch).
1551  */
1552 static bool drbd_pause_after(struct drbd_device *device)
1553 {
1554         bool changed = false;
1555         struct drbd_device *odev;
1556         int i;
1557
1558         rcu_read_lock();
1559         idr_for_each_entry(&drbd_devices, odev, i) {
1560                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1561                         continue;
1562                 if (!_drbd_may_sync_now(odev) &&
1563                     _drbd_set_state(_NS(odev, aftr_isp, 1),
1564                                     CS_HARD, NULL) != SS_NOTHING_TO_DO)
1565                         changed = true;
1566         }
1567         rcu_read_unlock();
1568
1569         return changed;
1570 }
1571
1572 /**
1573  * drbd_resume_next() - Resume resync on all devices that may resync now
1574  * @device:     DRBD device.
1575  *
1576  * Called from process context only (admin command and worker).
1577  */
1578 static bool drbd_resume_next(struct drbd_device *device)
1579 {
1580         bool changed = false;
1581         struct drbd_device *odev;
1582         int i;
1583
1584         rcu_read_lock();
1585         idr_for_each_entry(&drbd_devices, odev, i) {
1586                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1587                         continue;
1588                 if (odev->state.aftr_isp) {
1589                         if (_drbd_may_sync_now(odev) &&
1590                             _drbd_set_state(_NS(odev, aftr_isp, 0),
1591                                             CS_HARD, NULL) != SS_NOTHING_TO_DO)
1592                                 changed = true;
1593                 }
1594         }
1595         rcu_read_unlock();
1596         return changed;
1597 }
1598
1599 void resume_next_sg(struct drbd_device *device)
1600 {
1601         lock_all_resources();
1602         drbd_resume_next(device);
1603         unlock_all_resources();
1604 }
1605
1606 void suspend_other_sg(struct drbd_device *device)
1607 {
1608         lock_all_resources();
1609         drbd_pause_after(device);
1610         unlock_all_resources();
1611 }
1612
1613 /* caller must lock_all_resources() */
1614 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1615 {
1616         struct drbd_device *odev;
1617         int resync_after;
1618
1619         if (o_minor == -1)
1620                 return NO_ERROR;
1621         if (o_minor < -1 || o_minor > MINORMASK)
1622                 return ERR_RESYNC_AFTER;
1623
1624         /* check for loops */
1625         odev = minor_to_device(o_minor);
1626         while (1) {
1627                 if (odev == device)
1628                         return ERR_RESYNC_AFTER_CYCLE;
1629
1630                 /* You are free to depend on diskless, non-existing,
1631                  * or not yet/no longer existing minors.
1632                  * We only reject dependency loops.
1633                  * We cannot follow the dependency chain beyond a detached or
1634                  * missing minor.
1635                  */
1636                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1637                         return NO_ERROR;
1638
1639                 rcu_read_lock();
1640                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1641                 rcu_read_unlock();
1642                 /* dependency chain ends here, no cycles. */
1643                 if (resync_after == -1)
1644                         return NO_ERROR;
1645
1646                 /* follow the dependency chain */
1647                 odev = minor_to_device(resync_after);
1648         }
1649 }
1650
1651 /* caller must lock_all_resources() */
1652 void drbd_resync_after_changed(struct drbd_device *device)
1653 {
1654         int changed;
1655
1656         do {
1657                 changed  = drbd_pause_after(device);
1658                 changed |= drbd_resume_next(device);
1659         } while (changed);
1660 }
1661
1662 void drbd_rs_controller_reset(struct drbd_device *device)
1663 {
1664         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1665         struct fifo_buffer *plan;
1666
1667         atomic_set(&device->rs_sect_in, 0);
1668         atomic_set(&device->rs_sect_ev, 0);
1669         device->rs_in_flight = 0;
1670         device->rs_last_events =
1671                 (int)part_stat_read(&disk->part0, sectors[0]) +
1672                 (int)part_stat_read(&disk->part0, sectors[1]);
1673
1674         /* Updating the RCU protected object in place is necessary since
1675            this function gets called from atomic context.
1676            It is valid since all other updates also lead to an completely
1677            empty fifo */
1678         rcu_read_lock();
1679         plan = rcu_dereference(device->rs_plan_s);
1680         plan->total = 0;
1681         fifo_set(plan, 0);
1682         rcu_read_unlock();
1683 }
1684
1685 void start_resync_timer_fn(unsigned long data)
1686 {
1687         struct drbd_device *device = (struct drbd_device *) data;
1688         drbd_device_post_work(device, RS_START);
1689 }
1690
1691 static void do_start_resync(struct drbd_device *device)
1692 {
1693         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1694                 drbd_warn(device, "postponing start_resync ...\n");
1695                 device->start_resync_timer.expires = jiffies + HZ/10;
1696                 add_timer(&device->start_resync_timer);
1697                 return;
1698         }
1699
1700         drbd_start_resync(device, C_SYNC_SOURCE);
1701         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1702 }
1703
1704 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1705 {
1706         bool csums_after_crash_only;
1707         rcu_read_lock();
1708         csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1709         rcu_read_unlock();
1710         return connection->agreed_pro_version >= 89 &&          /* supported? */
1711                 connection->csums_tfm &&                        /* configured? */
1712                 (csums_after_crash_only == false                /* use for each resync? */
1713                  || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1714 }
1715
1716 /**
1717  * drbd_start_resync() - Start the resync process
1718  * @device:     DRBD device.
1719  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1720  *
1721  * This function might bring you directly into one of the
1722  * C_PAUSED_SYNC_* states.
1723  */
1724 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1725 {
1726         struct drbd_peer_device *peer_device = first_peer_device(device);
1727         struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1728         union drbd_state ns;
1729         int r;
1730
1731         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1732                 drbd_err(device, "Resync already running!\n");
1733                 return;
1734         }
1735
1736         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1737                 if (side == C_SYNC_TARGET) {
1738                         /* Since application IO was locked out during C_WF_BITMAP_T and
1739                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1740                            we check that we might make the data inconsistent. */
1741                         r = drbd_khelper(device, "before-resync-target");
1742                         r = (r >> 8) & 0xff;
1743                         if (r > 0) {
1744                                 drbd_info(device, "before-resync-target handler returned %d, "
1745                                          "dropping connection.\n", r);
1746                                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1747                                 return;
1748                         }
1749                 } else /* C_SYNC_SOURCE */ {
1750                         r = drbd_khelper(device, "before-resync-source");
1751                         r = (r >> 8) & 0xff;
1752                         if (r > 0) {
1753                                 if (r == 3) {
1754                                         drbd_info(device, "before-resync-source handler returned %d, "
1755                                                  "ignoring. Old userland tools?", r);
1756                                 } else {
1757                                         drbd_info(device, "before-resync-source handler returned %d, "
1758                                                  "dropping connection.\n", r);
1759                                         conn_request_state(connection,
1760                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1761                                         return;
1762                                 }
1763                         }
1764                 }
1765         }
1766
1767         if (current == connection->worker.task) {
1768                 /* The worker should not sleep waiting for state_mutex,
1769                    that can take long */
1770                 if (!mutex_trylock(device->state_mutex)) {
1771                         set_bit(B_RS_H_DONE, &device->flags);
1772                         device->start_resync_timer.expires = jiffies + HZ/5;
1773                         add_timer(&device->start_resync_timer);
1774                         return;
1775                 }
1776         } else {
1777                 mutex_lock(device->state_mutex);
1778         }
1779
1780         lock_all_resources();
1781         clear_bit(B_RS_H_DONE, &device->flags);
1782         /* Did some connection breakage or IO error race with us? */
1783         if (device->state.conn < C_CONNECTED
1784         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1785                 unlock_all_resources();
1786                 goto out;
1787         }
1788
1789         ns = drbd_read_state(device);
1790
1791         ns.aftr_isp = !_drbd_may_sync_now(device);
1792
1793         ns.conn = side;
1794
1795         if (side == C_SYNC_TARGET)
1796                 ns.disk = D_INCONSISTENT;
1797         else /* side == C_SYNC_SOURCE */
1798                 ns.pdsk = D_INCONSISTENT;
1799
1800         r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1801         ns = drbd_read_state(device);
1802
1803         if (ns.conn < C_CONNECTED)
1804                 r = SS_UNKNOWN_ERROR;
1805
1806         if (r == SS_SUCCESS) {
1807                 unsigned long tw = drbd_bm_total_weight(device);
1808                 unsigned long now = jiffies;
1809                 int i;
1810
1811                 device->rs_failed    = 0;
1812                 device->rs_paused    = 0;
1813                 device->rs_same_csum = 0;
1814                 device->rs_last_sect_ev = 0;
1815                 device->rs_total     = tw;
1816                 device->rs_start     = now;
1817                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1818                         device->rs_mark_left[i] = tw;
1819                         device->rs_mark_time[i] = now;
1820                 }
1821                 drbd_pause_after(device);
1822                 /* Forget potentially stale cached per resync extent bit-counts.
1823                  * Open coded drbd_rs_cancel_all(device), we already have IRQs
1824                  * disabled, and know the disk state is ok. */
1825                 spin_lock(&device->al_lock);
1826                 lc_reset(device->resync);
1827                 device->resync_locked = 0;
1828                 device->resync_wenr = LC_FREE;
1829                 spin_unlock(&device->al_lock);
1830         }
1831         unlock_all_resources();
1832
1833         if (r == SS_SUCCESS) {
1834                 wake_up(&device->al_wait); /* for lc_reset() above */
1835                 /* reset rs_last_bcast when a resync or verify is started,
1836                  * to deal with potential jiffies wrap. */
1837                 device->rs_last_bcast = jiffies - HZ;
1838
1839                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1840                      drbd_conn_str(ns.conn),
1841                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1842                      (unsigned long) device->rs_total);
1843                 if (side == C_SYNC_TARGET) {
1844                         device->bm_resync_fo = 0;
1845                         device->use_csums = use_checksum_based_resync(connection, device);
1846                 } else {
1847                         device->use_csums = false;
1848                 }
1849
1850                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1851                  * with w_send_oos, or the sync target will get confused as to
1852                  * how much bits to resync.  We cannot do that always, because for an
1853                  * empty resync and protocol < 95, we need to do it here, as we call
1854                  * drbd_resync_finished from here in that case.
1855                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1856                  * and from after_state_ch otherwise. */
1857                 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1858                         drbd_gen_and_send_sync_uuid(peer_device);
1859
1860                 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1861                         /* This still has a race (about when exactly the peers
1862                          * detect connection loss) that can lead to a full sync
1863                          * on next handshake. In 8.3.9 we fixed this with explicit
1864                          * resync-finished notifications, but the fix
1865                          * introduces a protocol change.  Sleeping for some
1866                          * time longer than the ping interval + timeout on the
1867                          * SyncSource, to give the SyncTarget the chance to
1868                          * detect connection loss, then waiting for a ping
1869                          * response (implicit in drbd_resync_finished) reduces
1870                          * the race considerably, but does not solve it. */
1871                         if (side == C_SYNC_SOURCE) {
1872                                 struct net_conf *nc;
1873                                 int timeo;
1874
1875                                 rcu_read_lock();
1876                                 nc = rcu_dereference(connection->net_conf);
1877                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1878                                 rcu_read_unlock();
1879                                 schedule_timeout_interruptible(timeo);
1880                         }
1881                         drbd_resync_finished(device);
1882                 }
1883
1884                 drbd_rs_controller_reset(device);
1885                 /* ns.conn may already be != device->state.conn,
1886                  * we may have been paused in between, or become paused until
1887                  * the timer triggers.
1888                  * No matter, that is handled in resync_timer_fn() */
1889                 if (ns.conn == C_SYNC_TARGET)
1890                         mod_timer(&device->resync_timer, jiffies);
1891
1892                 drbd_md_sync(device);
1893         }
1894         put_ldev(device);
1895 out:
1896         mutex_unlock(device->state_mutex);
1897 }
1898
1899 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1900 {
1901         struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1902         device->rs_last_bcast = jiffies;
1903
1904         if (!get_ldev(device))
1905                 return;
1906
1907         drbd_bm_write_lazy(device, 0);
1908         if (resync_done && is_sync_state(device->state.conn))
1909                 drbd_resync_finished(device);
1910
1911         drbd_bcast_event(device, &sib);
1912         /* update timestamp, in case it took a while to write out stuff */
1913         device->rs_last_bcast = jiffies;
1914         put_ldev(device);
1915 }
1916
1917 static void drbd_ldev_destroy(struct drbd_device *device)
1918 {
1919         lc_destroy(device->resync);
1920         device->resync = NULL;
1921         lc_destroy(device->act_log);
1922         device->act_log = NULL;
1923
1924         __acquire(local);
1925         drbd_backing_dev_free(device, device->ldev);
1926         device->ldev = NULL;
1927         __release(local);
1928
1929         clear_bit(GOING_DISKLESS, &device->flags);
1930         wake_up(&device->misc_wait);
1931 }
1932
1933 static void go_diskless(struct drbd_device *device)
1934 {
1935         D_ASSERT(device, device->state.disk == D_FAILED);
1936         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1937          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1938          * the protected members anymore, though, so once put_ldev reaches zero
1939          * again, it will be safe to free them. */
1940
1941         /* Try to write changed bitmap pages, read errors may have just
1942          * set some bits outside the area covered by the activity log.
1943          *
1944          * If we have an IO error during the bitmap writeout,
1945          * we will want a full sync next time, just in case.
1946          * (Do we want a specific meta data flag for this?)
1947          *
1948          * If that does not make it to stable storage either,
1949          * we cannot do anything about that anymore.
1950          *
1951          * We still need to check if both bitmap and ldev are present, we may
1952          * end up here after a failed attach, before ldev was even assigned.
1953          */
1954         if (device->bitmap && device->ldev) {
1955                 /* An interrupted resync or similar is allowed to recounts bits
1956                  * while we detach.
1957                  * Any modifications would not be expected anymore, though.
1958                  */
1959                 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1960                                         "detach", BM_LOCKED_TEST_ALLOWED)) {
1961                         if (test_bit(WAS_READ_ERROR, &device->flags)) {
1962                                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1963                                 drbd_md_sync(device);
1964                         }
1965                 }
1966         }
1967
1968         drbd_force_state(device, NS(disk, D_DISKLESS));
1969 }
1970
1971 static int do_md_sync(struct drbd_device *device)
1972 {
1973         drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1974         drbd_md_sync(device);
1975         return 0;
1976 }
1977
1978 /* only called from drbd_worker thread, no locking */
1979 void __update_timing_details(
1980                 struct drbd_thread_timing_details *tdp,
1981                 unsigned int *cb_nr,
1982                 void *cb,
1983                 const char *fn, const unsigned int line)
1984 {
1985         unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1986         struct drbd_thread_timing_details *td = tdp + i;
1987
1988         td->start_jif = jiffies;
1989         td->cb_addr = cb;
1990         td->caller_fn = fn;
1991         td->line = line;
1992         td->cb_nr = *cb_nr;
1993
1994         i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1995         td = tdp + i;
1996         memset(td, 0, sizeof(*td));
1997
1998         ++(*cb_nr);
1999 }
2000
2001 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2002 {
2003         if (test_bit(MD_SYNC, &todo))
2004                 do_md_sync(device);
2005         if (test_bit(RS_DONE, &todo) ||
2006             test_bit(RS_PROGRESS, &todo))
2007                 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2008         if (test_bit(GO_DISKLESS, &todo))
2009                 go_diskless(device);
2010         if (test_bit(DESTROY_DISK, &todo))
2011                 drbd_ldev_destroy(device);
2012         if (test_bit(RS_START, &todo))
2013                 do_start_resync(device);
2014 }
2015
2016 #define DRBD_DEVICE_WORK_MASK   \
2017         ((1UL << GO_DISKLESS)   \
2018         |(1UL << DESTROY_DISK)  \
2019         |(1UL << MD_SYNC)       \
2020         |(1UL << RS_START)      \
2021         |(1UL << RS_PROGRESS)   \
2022         |(1UL << RS_DONE)       \
2023         )
2024
2025 static unsigned long get_work_bits(unsigned long *flags)
2026 {
2027         unsigned long old, new;
2028         do {
2029                 old = *flags;
2030                 new = old & ~DRBD_DEVICE_WORK_MASK;
2031         } while (cmpxchg(flags, old, new) != old);
2032         return old & DRBD_DEVICE_WORK_MASK;
2033 }
2034
2035 static void do_unqueued_work(struct drbd_connection *connection)
2036 {
2037         struct drbd_peer_device *peer_device;
2038         int vnr;
2039
2040         rcu_read_lock();
2041         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2042                 struct drbd_device *device = peer_device->device;
2043                 unsigned long todo = get_work_bits(&device->flags);
2044                 if (!todo)
2045                         continue;
2046
2047                 kref_get(&device->kref);
2048                 rcu_read_unlock();
2049                 do_device_work(device, todo);
2050                 kref_put(&device->kref, drbd_destroy_device);
2051                 rcu_read_lock();
2052         }
2053         rcu_read_unlock();
2054 }
2055
2056 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2057 {
2058         spin_lock_irq(&queue->q_lock);
2059         list_splice_tail_init(&queue->q, work_list);
2060         spin_unlock_irq(&queue->q_lock);
2061         return !list_empty(work_list);
2062 }
2063
2064 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2065 {
2066         DEFINE_WAIT(wait);
2067         struct net_conf *nc;
2068         int uncork, cork;
2069
2070         dequeue_work_batch(&connection->sender_work, work_list);
2071         if (!list_empty(work_list))
2072                 return;
2073
2074         /* Still nothing to do?
2075          * Maybe we still need to close the current epoch,
2076          * even if no new requests are queued yet.
2077          *
2078          * Also, poke TCP, just in case.
2079          * Then wait for new work (or signal). */
2080         rcu_read_lock();
2081         nc = rcu_dereference(connection->net_conf);
2082         uncork = nc ? nc->tcp_cork : 0;
2083         rcu_read_unlock();
2084         if (uncork) {
2085                 mutex_lock(&connection->data.mutex);
2086                 if (connection->data.socket)
2087                         drbd_tcp_uncork(connection->data.socket);
2088                 mutex_unlock(&connection->data.mutex);
2089         }
2090
2091         for (;;) {
2092                 int send_barrier;
2093                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2094                 spin_lock_irq(&connection->resource->req_lock);
2095                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2096                 if (!list_empty(&connection->sender_work.q))
2097                         list_splice_tail_init(&connection->sender_work.q, work_list);
2098                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2099                 if (!list_empty(work_list) || signal_pending(current)) {
2100                         spin_unlock_irq(&connection->resource->req_lock);
2101                         break;
2102                 }
2103
2104                 /* We found nothing new to do, no to-be-communicated request,
2105                  * no other work item.  We may still need to close the last
2106                  * epoch.  Next incoming request epoch will be connection ->
2107                  * current transfer log epoch number.  If that is different
2108                  * from the epoch of the last request we communicated, it is
2109                  * safe to send the epoch separating barrier now.
2110                  */
2111                 send_barrier =
2112                         atomic_read(&connection->current_tle_nr) !=
2113                         connection->send.current_epoch_nr;
2114                 spin_unlock_irq(&connection->resource->req_lock);
2115
2116                 if (send_barrier)
2117                         maybe_send_barrier(connection,
2118                                         connection->send.current_epoch_nr + 1);
2119
2120                 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2121                         break;
2122
2123                 /* drbd_send() may have called flush_signals() */
2124                 if (get_t_state(&connection->worker) != RUNNING)
2125                         break;
2126
2127                 schedule();
2128                 /* may be woken up for other things but new work, too,
2129                  * e.g. if the current epoch got closed.
2130                  * In which case we send the barrier above. */
2131         }
2132         finish_wait(&connection->sender_work.q_wait, &wait);
2133
2134         /* someone may have changed the config while we have been waiting above. */
2135         rcu_read_lock();
2136         nc = rcu_dereference(connection->net_conf);
2137         cork = nc ? nc->tcp_cork : 0;
2138         rcu_read_unlock();
2139         mutex_lock(&connection->data.mutex);
2140         if (connection->data.socket) {
2141                 if (cork)
2142                         drbd_tcp_cork(connection->data.socket);
2143                 else if (!uncork)
2144                         drbd_tcp_uncork(connection->data.socket);
2145         }
2146         mutex_unlock(&connection->data.mutex);
2147 }
2148
2149 int drbd_worker(struct drbd_thread *thi)
2150 {
2151         struct drbd_connection *connection = thi->connection;
2152         struct drbd_work *w = NULL;
2153         struct drbd_peer_device *peer_device;
2154         LIST_HEAD(work_list);
2155         int vnr;
2156
2157         while (get_t_state(thi) == RUNNING) {
2158                 drbd_thread_current_set_cpu(thi);
2159
2160                 if (list_empty(&work_list)) {
2161                         update_worker_timing_details(connection, wait_for_work);
2162                         wait_for_work(connection, &work_list);
2163                 }
2164
2165                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2166                         update_worker_timing_details(connection, do_unqueued_work);
2167                         do_unqueued_work(connection);
2168                 }
2169
2170                 if (signal_pending(current)) {
2171                         flush_signals(current);
2172                         if (get_t_state(thi) == RUNNING) {
2173                                 drbd_warn(connection, "Worker got an unexpected signal\n");
2174                                 continue;
2175                         }
2176                         break;
2177                 }
2178
2179                 if (get_t_state(thi) != RUNNING)
2180                         break;
2181
2182                 if (!list_empty(&work_list)) {
2183                         w = list_first_entry(&work_list, struct drbd_work, list);
2184                         list_del_init(&w->list);
2185                         update_worker_timing_details(connection, w->cb);
2186                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2187                                 continue;
2188                         if (connection->cstate >= C_WF_REPORT_PARAMS)
2189                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2190                 }
2191         }
2192
2193         do {
2194                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2195                         update_worker_timing_details(connection, do_unqueued_work);
2196                         do_unqueued_work(connection);
2197                 }
2198                 if (!list_empty(&work_list)) {
2199                         w = list_first_entry(&work_list, struct drbd_work, list);
2200                         list_del_init(&w->list);
2201                         update_worker_timing_details(connection, w->cb);
2202                         w->cb(w, 1);
2203                 } else
2204                         dequeue_work_batch(&connection->sender_work, &work_list);
2205         } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2206
2207         rcu_read_lock();
2208         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2209                 struct drbd_device *device = peer_device->device;
2210                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2211                 kref_get(&device->kref);
2212                 rcu_read_unlock();
2213                 drbd_device_cleanup(device);
2214                 kref_put(&device->kref, drbd_destroy_device);
2215                 rcu_read_lock();
2216         }
2217         rcu_read_unlock();
2218
2219         return 0;
2220 }