]> git.karo-electronics.de Git - karo-tx-linux.git/blobdiff - fs/ocfs2/dlm/dlmmaster.c
ocfs2/dlm: add DEREF_DONE message
[karo-tx-linux.git] / fs / ocfs2 / dlm / dlmmaster.c
index 84f2f8079466a7fc07c8a4fda50d3513a02b68f9..8913e7d443da7db48575f5f7219d8e22204aedf8 100644 (file)
@@ -2375,6 +2375,122 @@ done:
        return ret;
 }
 
+int dlm_deref_lockres_done_handler(struct o2net_msg *msg, u32 len, void *data,
+                             void **ret_data)
+{
+       struct dlm_ctxt *dlm = data;
+       struct dlm_deref_lockres_done *deref
+                       = (struct dlm_deref_lockres_done *)msg->buf;
+       struct dlm_lock_resource *res = NULL;
+       char *name;
+       unsigned int namelen;
+       int ret = -EINVAL;
+       u8 node;
+       unsigned int hash;
+
+       if (!dlm_grab(dlm))
+               return 0;
+
+       name = deref->name;
+       namelen = deref->namelen;
+       node = deref->node_idx;
+
+       if (namelen > DLM_LOCKID_NAME_MAX) {
+               mlog(ML_ERROR, "Invalid name length!");
+               goto done;
+       }
+       if (deref->node_idx >= O2NM_MAX_NODES) {
+               mlog(ML_ERROR, "Invalid node number: %u\n", node);
+               goto done;
+       }
+
+       hash = dlm_lockid_hash(name, namelen);
+
+       spin_lock(&dlm->spinlock);
+       res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
+       if (!res) {
+               spin_unlock(&dlm->spinlock);
+               mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
+                    dlm->name, namelen, name);
+               goto done;
+       }
+
+       spin_lock(&res->spinlock);
+       BUG_ON(!(res->state & DLM_LOCK_RES_DROPPING_REF));
+       if (!list_empty(&res->purge)) {
+               mlog(0, "%s: Removing res %.*s from purgelist\n",
+                       dlm->name, res->lockname.len, res->lockname.name);
+               list_del_init(&res->purge);
+               dlm_lockres_put(res);
+               dlm->purge_count--;
+       }
+
+       if (!__dlm_lockres_unused(res)) {
+               mlog(ML_ERROR, "%s: res %.*s in use after deref\n",
+                       dlm->name, res->lockname.len, res->lockname.name);
+               __dlm_print_one_lock_resource(res);
+               BUG();
+       }
+
+       __dlm_unhash_lockres(dlm, res);
+
+       spin_lock(&dlm->track_lock);
+       if (!list_empty(&res->tracking))
+               list_del_init(&res->tracking);
+       else {
+               mlog(ML_ERROR, "%s: Resource %.*s not on the Tracking list\n",
+                    dlm->name, res->lockname.len, res->lockname.name);
+               __dlm_print_one_lock_resource(res);
+       }
+       spin_unlock(&dlm->track_lock);
+
+       /* lockres is not in the hash now. drop the flag and wake up
+        * any processes waiting in dlm_get_lock_resource.
+        */
+       res->state &= ~DLM_LOCK_RES_DROPPING_REF;
+       spin_unlock(&res->spinlock);
+       wake_up(&res->wq);
+
+       dlm_lockres_put(res);
+
+       spin_unlock(&dlm->spinlock);
+
+done:
+       dlm_put(dlm);
+       return ret;
+}
+
+static void dlm_drop_lockres_ref_done(struct dlm_ctxt *dlm,
+               struct dlm_lock_resource *res, u8 node)
+{
+       struct dlm_deref_lockres_done deref;
+       int ret = 0, r;
+       const char *lockname;
+       unsigned int namelen;
+
+       lockname = res->lockname.name;
+       namelen = res->lockname.len;
+       BUG_ON(namelen > O2NM_MAX_NAME_LEN);
+
+       memset(&deref, 0, sizeof(deref));
+       deref.node_idx = dlm->node_num;
+       deref.namelen = namelen;
+       memcpy(deref.name, lockname, namelen);
+
+       ret = o2net_send_message(DLM_DEREF_LOCKRES_DONE, dlm->key,
+                                &deref, sizeof(deref), node, &r);
+       if (ret < 0) {
+               mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF DONE "
+                               " to node %u\n", dlm->name, namelen,
+                               lockname, ret, node);
+       } else if (r < 0) {
+               /* ignore the error */
+               mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
+                    dlm->name, namelen, lockname, node, r);
+               dlm_print_one_lock_resource(res);
+       }
+}
+
 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
 {
        struct dlm_ctxt *dlm;
@@ -2388,8 +2504,8 @@ static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
 
        spin_lock(&res->spinlock);
        BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
+       __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
        if (test_bit(node, res->refmap)) {
-               __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
                dlm_lockres_clear_refmap_bit(dlm, res, node);
                cleared = 1;
        }
@@ -2519,6 +2635,11 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
        spin_lock(&dlm->master_lock);
        ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
                                    namelen, target, dlm->node_num);
+       /* get an extra reference on the mle.
+        * otherwise the assert_master from the new
+        * master will destroy this.
+        */
+       dlm_get_mle_inuse(mle);
        spin_unlock(&dlm->master_lock);
        spin_unlock(&dlm->spinlock);
 
@@ -2544,7 +2665,7 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
        }
 
 fail:
-       if (oldmle) {
+       if (ret != -EEXIST && oldmle) {
                /* master is known, detach if not already detached */
                dlm_mle_detach_hb_events(dlm, oldmle);
                dlm_put_mle(oldmle);
@@ -2554,6 +2675,7 @@ fail:
                if (mle_added) {
                        dlm_mle_detach_hb_events(dlm, mle);
                        dlm_put_mle(mle);
+                       dlm_put_mle_inuse(mle);
                } else if (mle) {
                        kmem_cache_free(dlm_mle_cache, mle);
                        mle = NULL;
@@ -2571,17 +2693,6 @@ fail:
         * ensure that all assert_master work is flushed. */
        flush_workqueue(dlm->dlm_worker);
 
-       /* get an extra reference on the mle.
-        * otherwise the assert_master from the new
-        * master will destroy this.
-        * also, make sure that all callers of dlm_get_mle
-        * take both dlm->spinlock and dlm->master_lock */
-       spin_lock(&dlm->spinlock);
-       spin_lock(&dlm->master_lock);
-       dlm_get_mle_inuse(mle);
-       spin_unlock(&dlm->master_lock);
-       spin_unlock(&dlm->spinlock);
-
        /* notify new node and send all lock state */
        /* call send_one_lockres with migration flag.
         * this serves as notice to the target node that a
@@ -3050,7 +3161,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
        int ret = 0;
 
        if (!dlm_grab(dlm))
-               return -EINVAL;
+               return 0;
 
        name = migrate->name;
        namelen = migrate->namelen;
@@ -3141,7 +3252,8 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
                                mlog(0, "tried to migrate %.*s, but some "
                                     "process beat me to it\n",
                                     namelen, name);
-                               ret = -EEXIST;
+                               spin_unlock(&tmp->spinlock);
+                               return -EEXIST;
                        } else {
                                /* bad.  2 NODES are trying to migrate! */
                                mlog(ML_ERROR, "migration error  mle: "
@@ -3312,6 +3424,15 @@ top:
                            mle->new_master != dead_node)
                                continue;
 
+                       if (mle->new_master == dead_node && mle->inuse) {
+                               mlog(ML_NOTICE, "%s: target %u died during "
+                                               "migration from %u, the MLE is "
+                                               "still keep used, ignore it!\n",
+                                               dlm->name, dead_node,
+                                               mle->master);
+                               continue;
+                       }
+
                        /* If we have reached this point, this mle needs to be
                         * removed from the list and freed. */
                        dlm_clean_migration_mle(dlm, mle);