]> git.karo-electronics.de Git - karo-tx-linux.git/commitdiff
ocfs2: use the new DLM operation callbacks while requesting new lockspace
authorGoldwyn Rodrigues <rgoldwyn@suse.de>
Tue, 5 Nov 2013 05:55:11 +0000 (16:55 +1100)
committerStephen Rothwell <sfr@canb.auug.org.au>
Tue, 5 Nov 2013 05:55:11 +0000 (16:55 +1100)
Attempt to use the new DLM operations.  If it is not supported, use the
traditional ocfs2_controld.

To exchange ocfs2 versioning, we use the LVB of the version dlm lock.  It
first attempts to take the lock in EX mode (non-blocking).  If successful
(which means it is the first mount), it writes the version number and
downconverts to PR lock.  If it is unsuccessful, it reads the version from
the lock.

If this becomes the standard (with o2cb as well), it could simplify
userspace tools to check if the filesystem is mounted on other nodes.

Signed-off-by: Goldwyn Rodrigues <rgoldwyn@suse.com>
Cc: Joel Becker <jlbec@evilplan.org>
Cc: Mark Fasheh <mfasheh@suse.com>
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
fs/ocfs2/stack_user.c

index 9e5c46615b7e6a6ffcdf6ed1468f162d5a0c0157..4599de7fe9178dd46fe8a07d3365f2a8cc0e170e 100644 (file)
@@ -23,6 +23,7 @@
 #include <linux/mutex.h>
 #include <linux/slab.h>
 #include <linux/reboot.h>
+#include <linux/sched.h>
 #include <asm/uaccess.h>
 
 #include "stackglue.h"
@@ -121,6 +122,7 @@ struct ocfs2_live_connection {
        struct dlm_lksb                 oc_version_lksb;
        char                            oc_lvb[DLM_LVB_LEN];
        struct completion               oc_sync_wait;
+       wait_queue_head_t               oc_wait;
 };
 
 struct ocfs2_control_private {
@@ -217,7 +219,7 @@ static int ocfs2_live_connection_new(struct ocfs2_cluster_connection *conn,
        mutex_lock(&ocfs2_control_lock);
        c->oc_conn = conn;
 
-       if (atomic_read(&ocfs2_control_opened))
+       if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened))
                list_add(&c->oc_list, &ocfs2_live_connection_list);
        else {
                printk(KERN_ERR
@@ -926,6 +928,7 @@ static void user_recover_done(void *arg, struct dlm_slot *slots,
                }
 
        lc->oc_our_slot = our_slot;
+       wake_up(&lc->oc_wait);
 }
 
 const struct dlm_lockspace_ops ocfs2_ls_ops = {
@@ -934,11 +937,21 @@ const struct dlm_lockspace_ops ocfs2_ls_ops = {
        .recover_done = user_recover_done,
 };
 
+static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
+{
+       version_unlock(conn);
+       dlm_release_lockspace(conn->cc_lockspace, 2);
+       conn->cc_lockspace = NULL;
+       ocfs2_live_connection_drop(conn->cc_private);
+       conn->cc_private = NULL;
+       return 0;
+}
+
 static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
 {
        dlm_lockspace_t *fsdlm;
        struct ocfs2_live_connection *lc;
-       int rc;
+       int rc, ops_rv;
 
        BUG_ON(conn == NULL);
 
@@ -948,12 +961,65 @@ static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
                goto out;
        }
 
-       lc->oc_type = WITH_CONTROLD;
+       init_waitqueue_head(&lc->oc_wait);
+       init_completion(&lc->oc_sync_wait);
+       atomic_set(&lc->oc_this_node, 0);
+       conn->cc_private = lc;
+       lc->oc_type = NO_CONTROLD;
+
+       rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name,
+                              DLM_LSFL_FS, DLM_LVB_LEN,
+                              &ocfs2_ls_ops, conn, &ops_rv, &fsdlm);
+       if (rc)
+               goto out;
+
+       if (ops_rv == -EOPNOTSUPP) {
+               lc->oc_type = WITH_CONTROLD;
+               printk(KERN_NOTICE "ocfs2: You seem to be using an older "
+                               "version of dlm_controld and/or ocfs2-tools."
+                               " Please consider upgrading.\n");
+       } else if (ops_rv) {
+               rc = ops_rv;
+               goto out;
+       }
+       conn->cc_lockspace = fsdlm;
 
        rc = ocfs2_live_connection_new(conn, lc);
        if (rc)
                goto out;
 
+       if (lc->oc_type == NO_CONTROLD) {
+               int ret;
+               lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb;
+               ret = version_lock(conn, DLM_LOCK_EX,
+                               DLM_LKF_VALBLK|DLM_LKF_NOQUEUE);
+               running_proto.pv_major =
+                       ocfs2_user_plugin.sp_max_proto.pv_major;
+               running_proto.pv_minor =
+                       ocfs2_user_plugin.sp_max_proto.pv_minor;
+               if (!ret) {
+                       conn->cc_version.pv_major = running_proto.pv_major;
+                       conn->cc_version.pv_minor = running_proto.pv_minor;
+                       version_to_lvb(&running_proto, lc->oc_lvb);
+                       version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
+               } else if (ret == -EAGAIN) {
+                       version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK);
+                       ret = lvb_to_version(lc->oc_lvb, &conn->cc_version);
+                       if (ret) {
+                               rc = ret;
+                               user_cluster_disconnect(conn);
+                               goto out;
+                       }
+               } else {
+                       printk(KERN_ERR "ocfs2: Could not determine"
+                                       " locking version\n");
+                       rc = ret;
+                       user_cluster_disconnect(conn);
+                       goto out;
+               }
+               wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0));
+       }
+
        /*
         * running_proto must have been set before we allowed any mounts
         * to proceed.
@@ -961,40 +1027,20 @@ static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
        if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
                printk(KERN_ERR
                       "Unable to mount with fs locking protocol version "
-                      "%u.%u because the userspace control daemon has "
-                      "negotiated %u.%u\n",
+                      "%u.%u because negotiated protocol is %u.%u\n",
                       conn->cc_version.pv_major, conn->cc_version.pv_minor,
                       running_proto.pv_major, running_proto.pv_minor);
                rc = -EPROTO;
                ocfs2_live_connection_drop(lc);
                lc = NULL;
-               goto out;
-       }
-
-       rc = dlm_new_lockspace(conn->cc_name, NULL, DLM_LSFL_FS, DLM_LVB_LEN,
-                              NULL, NULL, NULL, &fsdlm);
-       if (rc) {
-               ocfs2_live_connection_drop(lc);
-               lc = NULL;
-               goto out;
        }
 
-       conn->cc_private = lc;
-       conn->cc_lockspace = fsdlm;
 out:
        if (rc && lc)
                kfree(lc);
        return rc;
 }
 
-static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
-{
-       dlm_release_lockspace(conn->cc_lockspace, 2);
-       conn->cc_lockspace = NULL;
-       ocfs2_live_connection_drop(conn->cc_private);
-       conn->cc_private = NULL;
-       return 0;
-}
 
 static int user_cluster_this_node(struct ocfs2_cluster_connection *conn,
                                  unsigned int *this_node)
@@ -1004,8 +1050,11 @@ static int user_cluster_this_node(struct ocfs2_cluster_connection *conn,
 
        if (lc->oc_type == WITH_CONTROLD)
                rc = ocfs2_control_get_this_node();
+       else if (lc->oc_type == NO_CONTROLD)
+               rc = atomic_read(&lc->oc_this_node);
        else
                rc = -EINVAL;
+
        if (rc < 0)
                return rc;