]> git.karo-electronics.de Git - karo-tx-linux.git/blobdiff - drivers/md/md-cluster.c
Merge branch 'work.iov_iter' of git://git.kernel.org/pub/scm/linux/kernel/git/viro/vfs
[karo-tx-linux.git] / drivers / md / md-cluster.c
index dd97d42458226b4b284b488983e218e01c814889..41573f1f626f4dd43dd97d5e6372e38e6987ae5d 100644 (file)
@@ -61,6 +61,10 @@ struct resync_info {
  * the lock.
  */
 #define                MD_CLUSTER_SEND_LOCKED_ALREADY          5
+/* We should receive message after node joined cluster and
+ * set up all the related infos such as bitmap and personality */
+#define                MD_CLUSTER_ALREADY_IN_CLUSTER           6
+#define                MD_CLUSTER_PENDING_RECV_EVENT           7
 
 
 struct md_cluster_info {
@@ -85,6 +89,9 @@ struct md_cluster_info {
        struct completion newdisk_completion;
        wait_queue_head_t wait;
        unsigned long state;
+       /* record the region in RESYNCING message */
+       sector_t sync_low;
+       sector_t sync_hi;
 };
 
 enum msg_type {
@@ -284,11 +291,14 @@ static void recover_bitmaps(struct md_thread *thread)
                        goto dlm_unlock;
                }
                if (hi > 0) {
-                       /* TODO:Wait for current resync to get over */
-                       set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
                        if (lo < mddev->recovery_cp)
                                mddev->recovery_cp = lo;
-                       md_check_recovery(mddev);
+                       /* wake up thread to continue resync in case resync
+                        * is not finished */
+                       if (mddev->recovery_cp != MaxSector) {
+                           set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
+                           md_wakeup_thread(mddev->thread);
+                       }
                }
 dlm_unlock:
                dlm_unlock_sync(bm_lockres);
@@ -370,8 +380,12 @@ static void ack_bast(void *arg, int mode)
        struct dlm_lock_resource *res = arg;
        struct md_cluster_info *cinfo = res->mddev->cluster_info;
 
-       if (mode == DLM_LOCK_EX)
-               md_wakeup_thread(cinfo->recv_thread);
+       if (mode == DLM_LOCK_EX) {
+               if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state))
+                       md_wakeup_thread(cinfo->recv_thread);
+               else
+                       set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state);
+       }
 }
 
 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot)
@@ -408,6 +422,30 @@ static void process_suspend_info(struct mddev *mddev,
                md_wakeup_thread(mddev->thread);
                return;
        }
+
+       /*
+        * The bitmaps are not same for different nodes
+        * if RESYNCING is happening in one node, then
+        * the node which received the RESYNCING message
+        * probably will perform resync with the region
+        * [lo, hi] again, so we could reduce resync time
+        * a lot if we can ensure that the bitmaps among
+        * different nodes are match up well.
+        *
+        * sync_low/hi is used to record the region which
+        * arrived in the previous RESYNCING message,
+        *
+        * Call bitmap_sync_with_cluster to clear
+        * NEEDED_MASK and set RESYNC_MASK since
+        * resync thread is running in another node,
+        * so we don't need to do the resync again
+        * with the same section */
+       bitmap_sync_with_cluster(mddev, cinfo->sync_low,
+                                       cinfo->sync_hi,
+                                       lo, hi);
+       cinfo->sync_low = lo;
+       cinfo->sync_hi = hi;
+
        s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
        if (!s)
                return;
@@ -482,11 +520,13 @@ static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
                        __func__, __LINE__, le32_to_cpu(msg->raid_slot));
 }
 
-static void process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
+static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
 {
+       int ret = 0;
+
        if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
                "node %d received it's own msg\n", le32_to_cpu(msg->slot)))
-               return;
+               return -1;
        switch (le32_to_cpu(msg->type)) {
        case METADATA_UPDATED:
                process_metadata_update(mddev, msg);
@@ -509,9 +549,11 @@ static void process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
                __recover_slot(mddev, le32_to_cpu(msg->slot));
                break;
        default:
+               ret = -1;
                pr_warn("%s:%d Received unknown message from %d\n",
                        __func__, __LINE__, msg->slot);
        }
+       return ret;
 }
 
 /*
@@ -535,7 +577,9 @@ static void recv_daemon(struct md_thread *thread)
 
        /* read lvb and wake up thread to process this message_lockres */
        memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
-       process_recvd_msg(thread->mddev, &msg);
+       ret = process_recvd_msg(thread->mddev, &msg);
+       if (ret)
+               goto out;
 
        /*release CR on ack_lockres*/
        ret = dlm_unlock_sync(ack_lockres);
@@ -549,6 +593,7 @@ static void recv_daemon(struct md_thread *thread)
        ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
        if (unlikely(ret != 0))
                pr_info("lock CR on ack failed return %d\n", ret);
+out:
        /*release CR on message_lockres*/
        ret = dlm_unlock_sync(message_lockres);
        if (unlikely(ret != 0))
@@ -778,17 +823,24 @@ static int join(struct mddev *mddev, int nodes)
        cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
        if (!cinfo->token_lockres)
                goto err;
-       cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
-       if (!cinfo->ack_lockres)
-               goto err;
        cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
        if (!cinfo->no_new_dev_lockres)
                goto err;
 
+       ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
+       if (ret) {
+               ret = -EAGAIN;
+               pr_err("md-cluster: can't join cluster to avoid lock issue\n");
+               goto err;
+       }
+       cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
+       if (!cinfo->ack_lockres)
+               goto err;
        /* get sync CR lock on ACK. */
        if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
                pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
                                ret);
+       dlm_unlock_sync(cinfo->token_lockres);
        /* get sync CR lock on no-new-dev. */
        if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
                pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
@@ -809,12 +861,10 @@ static int join(struct mddev *mddev, int nodes)
        if (!cinfo->resync_lockres)
                goto err;
 
-       ret = gather_all_resync_info(mddev, nodes);
-       if (ret)
-               goto err;
-
        return 0;
 err:
+       md_unregister_thread(&cinfo->recovery_thread);
+       md_unregister_thread(&cinfo->recv_thread);
        lockres_free(cinfo->message_lockres);
        lockres_free(cinfo->token_lockres);
        lockres_free(cinfo->ack_lockres);
@@ -828,6 +878,19 @@ err:
        return ret;
 }
 
+static void load_bitmaps(struct mddev *mddev, int total_slots)
+{
+       struct md_cluster_info *cinfo = mddev->cluster_info;
+
+       /* load all the node's bitmap info for resync */
+       if (gather_all_resync_info(mddev, total_slots))
+               pr_err("md-cluster: failed to gather all resyn infos\n");
+       set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state);
+       /* wake up recv thread in case something need to be handled */
+       if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state))
+               md_wakeup_thread(cinfo->recv_thread);
+}
+
 static void resync_bitmap(struct mddev *mddev)
 {
        struct md_cluster_info *cinfo = mddev->cluster_info;
@@ -937,7 +1000,6 @@ static void metadata_update_cancel(struct mddev *mddev)
 static int resync_start(struct mddev *mddev)
 {
        struct md_cluster_info *cinfo = mddev->cluster_info;
-       cinfo->resync_lockres->flags |= DLM_LKF_NOQUEUE;
        return dlm_lock_sync(cinfo->resync_lockres, DLM_LOCK_EX);
 }
 
@@ -967,7 +1029,6 @@ static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
 static int resync_finish(struct mddev *mddev)
 {
        struct md_cluster_info *cinfo = mddev->cluster_info;
-       cinfo->resync_lockres->flags &= ~DLM_LKF_NOQUEUE;
        dlm_unlock_sync(cinfo->resync_lockres);
        return resync_info_update(mddev, 0, 0);
 }
@@ -1171,6 +1232,7 @@ static struct md_cluster_operations cluster_ops = {
        .add_new_disk_cancel = add_new_disk_cancel,
        .new_disk_ack = new_disk_ack,
        .remove_disk = remove_disk,
+       .load_bitmaps = load_bitmaps,
        .gather_bitmaps = gather_bitmaps,
        .lock_all_bitmaps = lock_all_bitmaps,
        .unlock_all_bitmaps = unlock_all_bitmaps,