]> git.karo-electronics.de Git - mv-sheeva.git/blobdiff - drivers/block/drbd/drbd_worker.c
Merge branch 'master' into tk71
[mv-sheeva.git] / drivers / block / drbd / drbd_worker.c
index ca4a16cea2d8959a39bdffa66146a81672c76d6c..34f224b018b37b1e1781134fedf67179799ab20e 100644 (file)
@@ -26,7 +26,6 @@
 #include <linux/module.h>
 #include <linux/drbd.h>
 #include <linux/sched.h>
-#include <linux/smp_lock.h>
 #include <linux/wait.h>
 #include <linux/mm.h>
 #include <linux/memcontrol.h>
@@ -39,8 +38,6 @@
 #include "drbd_int.h"
 #include "drbd_req.h"
 
-#define SLEEP_TIME (HZ/10)
-
 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
 
 
@@ -104,12 +101,6 @@ void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
        put_ldev(mdev);
 }
 
-static int is_failed_barrier(int ee_flags)
-{
-       return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
-                       == (EE_IS_BARRIER|EE_WAS_ERROR);
-}
-
 /* writes on behalf of the partner, or resync writes,
  * "submitted" by the receiver, final stage.  */
 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
@@ -121,21 +112,6 @@ static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(lo
        int is_syncer_req;
        int do_al_complete_io;
 
-       /* if this is a failed barrier request, disable use of barriers,
-        * and schedule for resubmission */
-       if (is_failed_barrier(e->flags)) {
-               drbd_bump_write_ordering(mdev, WO_bdev_flush);
-               spin_lock_irqsave(&mdev->req_lock, flags);
-               list_del(&e->w.list);
-               e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
-               e->w.cb = w_e_reissue;
-               /* put_ldev actually happens below, once we come here again. */
-               __release(local);
-               spin_unlock_irqrestore(&mdev->req_lock, flags);
-               drbd_queue_work(&mdev->data.work, &e->w);
-               return;
-       }
-
        D_ASSERT(e->block_id != ID_VACANT);
 
        /* after we moved e to done_ee,
@@ -246,6 +222,7 @@ void drbd_endio_pri(struct bio *bio, int error)
        bio_put(req->private_bio);
        req->private_bio = ERR_PTR(error);
 
+       /* not req_mod(), we need irqsave here! */
        spin_lock_irqsave(&mdev->req_lock, flags);
        __req_mod(req, what, &m);
        spin_unlock_irqrestore(&mdev->req_lock, flags);
@@ -376,54 +353,145 @@ static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
        struct drbd_epoch_entry *e;
 
        if (!get_ldev(mdev))
-               return 0;
+               return -EIO;
+
+       if (drbd_rs_should_slow_down(mdev))
+               goto defer;
 
        /* GFP_TRY, because if there is no memory available right now, this may
         * be rescheduled for later. It is "only" background resync, after all. */
        e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
        if (!e)
-               goto fail;
+               goto defer;
 
+       e->w.cb = w_e_send_csum;
        spin_lock_irq(&mdev->req_lock);
        list_add(&e->w.list, &mdev->read_ee);
        spin_unlock_irq(&mdev->req_lock);
 
-       e->w.cb = w_e_send_csum;
+       atomic_add(size >> 9, &mdev->rs_sect_ev);
        if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
-               return 1;
+               return 0;
+
+       /* drbd_submit_ee currently fails for one reason only:
+        * not being able to allocate enough bios.
+        * Is dropping the connection going to help? */
+       spin_lock_irq(&mdev->req_lock);
+       list_del(&e->w.list);
+       spin_unlock_irq(&mdev->req_lock);
 
        drbd_free_ee(mdev, e);
-fail:
+defer:
        put_ldev(mdev);
-       return 2;
+       return -EAGAIN;
 }
 
 void resync_timer_fn(unsigned long data)
 {
-       unsigned long flags;
        struct drbd_conf *mdev = (struct drbd_conf *) data;
        int queue;
 
-       spin_lock_irqsave(&mdev->req_lock, flags);
-
-       if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
-               queue = 1;
-               if (mdev->state.conn == C_VERIFY_S)
-                       mdev->resync_work.cb = w_make_ov_request;
-               else
-                       mdev->resync_work.cb = w_make_resync_request;
-       } else {
+       queue = 1;
+       switch (mdev->state.conn) {
+       case C_VERIFY_S:
+               mdev->resync_work.cb = w_make_ov_request;
+               break;
+       case C_SYNC_TARGET:
+               mdev->resync_work.cb = w_make_resync_request;
+               break;
+       default:
                queue = 0;
                mdev->resync_work.cb = w_resync_inactive;
        }
 
-       spin_unlock_irqrestore(&mdev->req_lock, flags);
-
        /* harmless race: list_empty outside data.work.q_lock */
        if (list_empty(&mdev->resync_work.list) && queue)
                drbd_queue_work(&mdev->data.work, &mdev->resync_work);
 }
 
+static void fifo_set(struct fifo_buffer *fb, int value)
+{
+       int i;
+
+       for (i = 0; i < fb->size; i++)
+               fb->values[i] = value;
+}
+
+static int fifo_push(struct fifo_buffer *fb, int value)
+{
+       int ov;
+
+       ov = fb->values[fb->head_index];
+       fb->values[fb->head_index++] = value;
+
+       if (fb->head_index >= fb->size)
+               fb->head_index = 0;
+
+       return ov;
+}
+
+static void fifo_add_val(struct fifo_buffer *fb, int value)
+{
+       int i;
+
+       for (i = 0; i < fb->size; i++)
+               fb->values[i] += value;
+}
+
+int drbd_rs_controller(struct drbd_conf *mdev)
+{
+       unsigned int sect_in;  /* Number of sectors that came in since the last turn */
+       unsigned int want;     /* The number of sectors we want in the proxy */
+       int req_sect; /* Number of sectors to request in this turn */
+       int correction; /* Number of sectors more we need in the proxy*/
+       int cps; /* correction per invocation of drbd_rs_controller() */
+       int steps; /* Number of time steps to plan ahead */
+       int curr_corr;
+       int max_sect;
+
+       sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
+       mdev->rs_in_flight -= sect_in;
+
+       spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
+
+       steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
+
+       if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
+               want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
+       } else { /* normal path */
+               want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
+                       sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
+       }
+
+       correction = want - mdev->rs_in_flight - mdev->rs_planed;
+
+       /* Plan ahead */
+       cps = correction / steps;
+       fifo_add_val(&mdev->rs_plan_s, cps);
+       mdev->rs_planed += cps * steps;
+
+       /* What we do in this step */
+       curr_corr = fifo_push(&mdev->rs_plan_s, 0);
+       spin_unlock(&mdev->peer_seq_lock);
+       mdev->rs_planed -= curr_corr;
+
+       req_sect = sect_in + curr_corr;
+       if (req_sect < 0)
+               req_sect = 0;
+
+       max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
+       if (req_sect > max_sect)
+               req_sect = max_sect;
+
+       /*
+       dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
+                sect_in, mdev->rs_in_flight, want, correction,
+                steps, cps, mdev->rs_planed, curr_corr, req_sect);
+       */
+
+       return req_sect;
+}
+
 int w_make_resync_request(struct drbd_conf *mdev,
                struct drbd_work *w, int cancel)
 {
@@ -431,8 +499,9 @@ int w_make_resync_request(struct drbd_conf *mdev,
        sector_t sector;
        const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
        int max_segment_size;
-       int number, i, size, pe, mx;
+       int number, rollback_i, size, pe, mx;
        int align, queued, sndbuf;
+       int i = 0;
 
        if (unlikely(cancel))
                return 1;
@@ -446,6 +515,12 @@ int w_make_resync_request(struct drbd_conf *mdev,
                dev_err(DEV, "%s in w_make_resync_request\n",
                        drbd_conn_str(mdev->state.conn));
 
+       if (mdev->rs_total == 0) {
+               /* empty resync? */
+               drbd_resync_finished(mdev);
+               return 1;
+       }
+
        if (!get_ldev(mdev)) {
                /* Since we only need to access mdev->rsync a
                   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
@@ -458,11 +533,25 @@ int w_make_resync_request(struct drbd_conf *mdev,
 
        /* starting with drbd 8.3.8, we can handle multi-bio EEs,
         * if it should be necessary */
-       max_segment_size = mdev->agreed_pro_version < 94 ?
-               queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
+       max_segment_size =
+               mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) :
+               mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE;
 
-       number = SLEEP_TIME * mdev->sync_conf.rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
-       pe = atomic_read(&mdev->rs_pending_cnt);
+       if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
+               number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
+               mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
+       } else {
+               mdev->c_sync_rate = mdev->sync_conf.rate;
+               number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
+       }
+
+       /* Throttle resync on lower level disk activity, which may also be
+        * caused by application IO on Primary/SyncTarget.
+        * Keep this after the call to drbd_rs_controller, as that assumes
+        * to be called as precisely as possible every SLEEP_TIME,
+        * and would be confused otherwise. */
+       if (drbd_rs_should_slow_down(mdev))
+               goto requeue;
 
        mutex_lock(&mdev->data.mutex);
        if (mdev->data.socket)
@@ -476,6 +565,7 @@ int w_make_resync_request(struct drbd_conf *mdev,
                mx = number;
 
        /* Limit the number of pending RS requests to no more than the peer's receive buffer */
+       pe = atomic_read(&mdev->rs_pending_cnt);
        if ((pe + number) > mx) {
                number = mx - pe;
        }
@@ -526,6 +616,7 @@ next_sector:
                 * be prepared for all stripe sizes of software RAIDs.
                 */
                align = 1;
+               rollback_i = i;
                for (;;) {
                        if (size + BM_BLOCK_SIZE > max_segment_size)
                                break;
@@ -561,14 +652,19 @@ next_sector:
                        size = (capacity-sector)<<9;
                if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
                        switch (read_for_csum(mdev, sector, size)) {
-                       case 0: /* Disk failure*/
+                       case -EIO: /* Disk failure */
                                put_ldev(mdev);
                                return 0;
-                       case 2: /* Allocation failed */
+                       case -EAGAIN: /* allocation failed, or ldev busy */
                                drbd_rs_complete_io(mdev, sector);
                                mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
+                               i = rollback_i;
                                goto requeue;
-                       /* case 1: everything ok */
+                       case 0:
+                               /* everything ok */
+                               break;
+                       default:
+                               BUG();
                        }
                } else {
                        inc_rs_pending(mdev);
@@ -595,6 +691,7 @@ next_sector:
        }
 
  requeue:
+       mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
        mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
        put_ldev(mdev);
        return 1;
@@ -670,6 +767,14 @@ static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int ca
        return 1;
 }
 
+static void ping_peer(struct drbd_conf *mdev)
+{
+       clear_bit(GOT_PING_ACK, &mdev->flags);
+       request_ping(mdev);
+       wait_event(mdev->misc_wait,
+                  test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
+}
+
 int drbd_resync_finished(struct drbd_conf *mdev)
 {
        unsigned long db, dt, dbdt;
@@ -709,6 +814,8 @@ int drbd_resync_finished(struct drbd_conf *mdev)
        if (!get_ldev(mdev))
                goto out;
 
+       ping_peer(mdev);
+
        spin_lock_irq(&mdev->req_lock);
        os = mdev->state;
 
@@ -801,8 +908,10 @@ out:
        mdev->rs_paused = 0;
        mdev->ov_start_sector = 0;
 
+       drbd_md_sync(mdev);
+
        if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
-               dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
+               dev_info(DEV, "Writing the whole bitmap\n");
                drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
        }
 
@@ -817,9 +926,13 @@ static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_ent
 {
        if (drbd_ee_has_active_page(e)) {
                /* This might happen if sendpage() has not finished */
+               int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
+               atomic_add(i, &mdev->pp_in_use_by_net);
+               atomic_sub(i, &mdev->pp_in_use);
                spin_lock_irq(&mdev->req_lock);
                list_add_tail(&e->w.list, &mdev->net_ee);
                spin_unlock_irq(&mdev->req_lock);
+               wake_up(&drbd_pp_wait);
        } else
                drbd_free_ee(mdev, e);
 }
@@ -926,9 +1039,12 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
                return 1;
        }
 
-       drbd_rs_complete_io(mdev, e->sector);
+       if (get_ldev(mdev)) {
+               drbd_rs_complete_io(mdev, e->sector);
+               put_ldev(mdev);
+       }
 
-       di = (struct digest_info *)(unsigned long)e->block_id;
+       di = e->digest;
 
        if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                /* quick hack to try to avoid a race against reconfiguration.
@@ -952,7 +1068,9 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
                        ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
                } else {
                        inc_rs_pending(mdev);
-                       e->block_id = ID_SYNCER;
+                       e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
+                       e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
+                       kfree(di);
                        ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
                }
        } else {
@@ -962,9 +1080,6 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
        }
 
        dec_unacked(mdev);
-
-       kfree(di);
-
        move_to_net_ee_or_free(mdev, e);
 
        if (unlikely(!ok))
@@ -1034,9 +1149,12 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 
        /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
         * the resync lru has been cleaned up already */
-       drbd_rs_complete_io(mdev, e->sector);
+       if (get_ldev(mdev)) {
+               drbd_rs_complete_io(mdev, e->sector);
+               put_ldev(mdev);
+       }
 
-       di = (struct digest_info *)(unsigned long)e->block_id;
+       di = e->digest;
 
        if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                digest_size = crypto_hash_digestsize(mdev->verify_tfm);
@@ -1055,9 +1173,6 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
        }
 
        dec_unacked(mdev);
-
-       kfree(di);
-
        if (!eq)
                drbd_ov_oos_found(mdev, e->sector, e->size);
        else
@@ -1108,7 +1223,7 @@ int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
         * dec_ap_pending will be done in got_BarrierAck
         * or (on connection loss) in w_clear_epoch.  */
        ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
-                               (struct p_header *)p, sizeof(*p), 0);
+                               (struct p_header80 *)p, sizeof(*p), 0);
        drbd_put_data_sock(mdev);
 
        return ok;
@@ -1173,6 +1288,24 @@ int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
        return ok;
 }
 
+int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
+{
+       struct drbd_request *req = container_of(w, struct drbd_request, w);
+
+       if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
+               drbd_al_begin_io(mdev, req->sector);
+       /* Calling drbd_al_begin_io() out of the worker might deadlocks
+          theoretically. Practically it can not deadlock, since this is
+          only used when unfreezing IOs. All the extents of the requests
+          that made it into the TL are already active */
+
+       drbd_req_make_private_bio(req, req->master_bio);
+       req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
+       generic_make_request(req->private_bio);
+
+       return 1;
+}
+
 static int _drbd_may_sync_now(struct drbd_conf *mdev)
 {
        struct drbd_conf *odev = mdev;
@@ -1298,14 +1431,6 @@ int drbd_alter_sa(struct drbd_conf *mdev, int na)
        return retcode;
 }
 
-static void ping_peer(struct drbd_conf *mdev)
-{
-       clear_bit(GOT_PING_ACK, &mdev->flags);
-       request_ping(mdev);
-       wait_event(mdev->misc_wait,
-                  test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
-}
-
 /**
  * drbd_start_resync() - Start the resync process
  * @mdev:      DRBD device.
@@ -1379,13 +1504,21 @@ void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
                r = SS_UNKNOWN_ERROR;
 
        if (r == SS_SUCCESS) {
-               mdev->rs_total     =
-               mdev->rs_mark_left = drbd_bm_total_weight(mdev);
+               unsigned long tw = drbd_bm_total_weight(mdev);
+               unsigned long now = jiffies;
+               int i;
+
                mdev->rs_failed    = 0;
                mdev->rs_paused    = 0;
-               mdev->rs_start     =
-               mdev->rs_mark_time = jiffies;
                mdev->rs_same_csum = 0;
+               mdev->rs_last_events = 0;
+               mdev->rs_last_sect_ev = 0;
+               mdev->rs_total     = tw;
+               mdev->rs_start     = now;
+               for (i = 0; i < DRBD_SYNC_MARKS; i++) {
+                       mdev->rs_mark_left[i] = tw;
+                       mdev->rs_mark_time[i] = now;
+               }
                _drbd_pause_after(mdev);
        }
        write_unlock_irq(&global_state_lock);
@@ -1397,12 +1530,31 @@ void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
                     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
                     (unsigned long) mdev->rs_total);
 
-               if (mdev->rs_total == 0) {
-                       /* Peer still reachable? Beware of failing before-resync-target handlers! */
-                       ping_peer(mdev);
+               if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
+                       /* This still has a race (about when exactly the peers
+                        * detect connection loss) that can lead to a full sync
+                        * on next handshake. In 8.3.9 we fixed this with explicit
+                        * resync-finished notifications, but the fix
+                        * introduces a protocol change.  Sleeping for some
+                        * time longer than the ping interval + timeout on the
+                        * SyncSource, to give the SyncTarget the chance to
+                        * detect connection loss, then waiting for a ping
+                        * response (implicit in drbd_resync_finished) reduces
+                        * the race considerably, but does not solve it. */
+                       if (side == C_SYNC_SOURCE)
+                               schedule_timeout_interruptible(
+                                       mdev->net_conf->ping_int * HZ +
+                                       mdev->net_conf->ping_timeo*HZ/9);
                        drbd_resync_finished(mdev);
                }
 
+               atomic_set(&mdev->rs_sect_in, 0);
+               atomic_set(&mdev->rs_sect_ev, 0);
+               mdev->rs_in_flight = 0;
+               mdev->rs_planed = 0;
+               spin_lock(&mdev->peer_seq_lock);
+               fifo_set(&mdev->rs_plan_s, 0);
+               spin_unlock(&mdev->peer_seq_lock);
                /* ns.conn may already be != mdev->state.conn,
                 * we may have been paused in between, or become paused until
                 * the timer triggers.