]> git.karo-electronics.de Git - linux-beck.git/blobdiff - fs/ocfs2/dlm/dlmdomain.c
Merge branches 'x86-fixes-for-linus', 'perf-fixes-for-linus' and 'sched-fixes-for...
[linux-beck.git] / fs / ocfs2 / dlm / dlmdomain.c
index 11a5c87fd7f7c00de41c61c00fae625e46675973..cc2aaa96cfe5c58540c23f863701102a0c8096a8 100644 (file)
@@ -128,10 +128,14 @@ static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
  * will have a negotiated version with the same major number and a minor
  * number equal or smaller.  The dlm_ctxt->dlm_locking_proto field should
  * be used to determine what a running domain is actually using.
+ *
+ * New in version 1.1:
+ *     - Message DLM_QUERY_REGION added to support global heartbeat
+ *     - Message DLM_QUERY_NODEINFO added to allow online node removes
  */
 static const struct dlm_protocol_version dlm_protocol = {
        .pv_major = 1,
-       .pv_minor = 0,
+       .pv_minor = 1,
 };
 
 #define DLM_DOMAIN_BACKOFF_MS 200
@@ -142,6 +146,8 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
                                     void **ret_data);
 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
                                   void **ret_data);
+static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
+                                   void *data, void **ret_data);
 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
                                   void **ret_data);
 static int dlm_protocol_compare(struct dlm_protocol_version *existing,
@@ -921,6 +927,370 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
        return 0;
 }
 
+static int dlm_match_regions(struct dlm_ctxt *dlm,
+                            struct dlm_query_region *qr)
+{
+       char *local = NULL, *remote = qr->qr_regions;
+       char *l, *r;
+       int localnr, i, j, foundit;
+       int status = 0;
+
+       if (!o2hb_global_heartbeat_active()) {
+               if (qr->qr_numregions) {
+                       mlog(ML_ERROR, "Domain %s: Joining node %d has global "
+                            "heartbeat enabled but local node %d does not\n",
+                            qr->qr_domain, qr->qr_node, dlm->node_num);
+                       status = -EINVAL;
+               }
+               goto bail;
+       }
+
+       if (o2hb_global_heartbeat_active() && !qr->qr_numregions) {
+               mlog(ML_ERROR, "Domain %s: Local node %d has global "
+                    "heartbeat enabled but joining node %d does not\n",
+                    qr->qr_domain, dlm->node_num, qr->qr_node);
+               status = -EINVAL;
+               goto bail;
+       }
+
+       r = remote;
+       for (i = 0; i < qr->qr_numregions; ++i) {
+               mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r);
+               r += O2HB_MAX_REGION_NAME_LEN;
+       }
+
+       local = kmalloc(sizeof(qr->qr_regions), GFP_ATOMIC);
+       if (!local) {
+               status = -ENOMEM;
+               goto bail;
+       }
+
+       localnr = o2hb_get_all_regions(local, O2NM_MAX_REGIONS);
+
+       /* compare local regions with remote */
+       l = local;
+       for (i = 0; i < localnr; ++i) {
+               foundit = 0;
+               r = remote;
+               for (j = 0; j <= qr->qr_numregions; ++j) {
+                       if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) {
+                               foundit = 1;
+                               break;
+                       }
+                       r += O2HB_MAX_REGION_NAME_LEN;
+               }
+               if (!foundit) {
+                       status = -EINVAL;
+                       mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
+                            "in local node %d but not in joining node %d\n",
+                            qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, l,
+                            dlm->node_num, qr->qr_node);
+                       goto bail;
+               }
+               l += O2HB_MAX_REGION_NAME_LEN;
+       }
+
+       /* compare remote with local regions */
+       r = remote;
+       for (i = 0; i < qr->qr_numregions; ++i) {
+               foundit = 0;
+               l = local;
+               for (j = 0; j < localnr; ++j) {
+                       if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) {
+                               foundit = 1;
+                               break;
+                       }
+                       l += O2HB_MAX_REGION_NAME_LEN;
+               }
+               if (!foundit) {
+                       status = -EINVAL;
+                       mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
+                            "in joining node %d but not in local node %d\n",
+                            qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, r,
+                            qr->qr_node, dlm->node_num);
+                       goto bail;
+               }
+               r += O2HB_MAX_REGION_NAME_LEN;
+       }
+
+bail:
+       kfree(local);
+
+       return status;
+}
+
+static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map)
+{
+       struct dlm_query_region *qr = NULL;
+       int status, ret = 0, i;
+       char *p;
+
+       if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
+               goto bail;
+
+       qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL);
+       if (!qr) {
+               ret = -ENOMEM;
+               mlog_errno(ret);
+               goto bail;
+       }
+
+       qr->qr_node = dlm->node_num;
+       qr->qr_namelen = strlen(dlm->name);
+       memcpy(qr->qr_domain, dlm->name, qr->qr_namelen);
+       /* if local hb, the numregions will be zero */
+       if (o2hb_global_heartbeat_active())
+               qr->qr_numregions = o2hb_get_all_regions(qr->qr_regions,
+                                                        O2NM_MAX_REGIONS);
+
+       p = qr->qr_regions;
+       for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN)
+               mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p);
+
+       i = -1;
+       while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
+                                 i + 1)) < O2NM_MAX_NODES) {
+               if (i == dlm->node_num)
+                       continue;
+
+               mlog(0, "Sending regions to node %d\n", i);
+
+               ret = o2net_send_message(DLM_QUERY_REGION, DLM_MOD_KEY, qr,
+                                        sizeof(struct dlm_query_region),
+                                        i, &status);
+               if (ret >= 0)
+                       ret = status;
+               if (ret) {
+                       mlog(ML_ERROR, "Region mismatch %d, node %d\n",
+                            ret, i);
+                       break;
+               }
+       }
+
+bail:
+       kfree(qr);
+       return ret;
+}
+
+static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
+                                   void *data, void **ret_data)
+{
+       struct dlm_query_region *qr;
+       struct dlm_ctxt *dlm = NULL;
+       int status = 0;
+       int locked = 0;
+
+       qr = (struct dlm_query_region *) msg->buf;
+
+       mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node,
+            qr->qr_domain);
+
+       status = -EINVAL;
+
+       spin_lock(&dlm_domain_lock);
+       dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen);
+       if (!dlm) {
+               mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
+                    "before join domain\n", qr->qr_node, qr->qr_domain);
+               goto bail;
+       }
+
+       spin_lock(&dlm->spinlock);
+       locked = 1;
+       if (dlm->joining_node != qr->qr_node) {
+               mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
+                    "but joining node is %d\n", qr->qr_node, qr->qr_domain,
+                    dlm->joining_node);
+               goto bail;
+       }
+
+       /* Support for global heartbeat was added in 1.1 */
+       if (dlm->dlm_locking_proto.pv_major == 1 &&
+           dlm->dlm_locking_proto.pv_minor == 0) {
+               mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
+                    "but active dlm protocol is %d.%d\n", qr->qr_node,
+                    qr->qr_domain, dlm->dlm_locking_proto.pv_major,
+                    dlm->dlm_locking_proto.pv_minor);
+               goto bail;
+       }
+
+       status = dlm_match_regions(dlm, qr);
+
+bail:
+       if (locked)
+               spin_unlock(&dlm->spinlock);
+       spin_unlock(&dlm_domain_lock);
+
+       return status;
+}
+
+static int dlm_match_nodes(struct dlm_ctxt *dlm, struct dlm_query_nodeinfo *qn)
+{
+       struct o2nm_node *local;
+       struct dlm_node_info *remote;
+       int i, j;
+       int status = 0;
+
+       for (j = 0; j < qn->qn_numnodes; ++j)
+               mlog(0, "Node %3d, %pI4:%u\n", qn->qn_nodes[j].ni_nodenum,
+                    &(qn->qn_nodes[j].ni_ipv4_address),
+                    ntohs(qn->qn_nodes[j].ni_ipv4_port));
+
+       for (i = 0; i < O2NM_MAX_NODES && !status; ++i) {
+               local = o2nm_get_node_by_num(i);
+               remote = NULL;
+               for (j = 0; j < qn->qn_numnodes; ++j) {
+                       if (qn->qn_nodes[j].ni_nodenum == i) {
+                               remote = &(qn->qn_nodes[j]);
+                               break;
+                       }
+               }
+
+               if (!local && !remote)
+                       continue;
+
+               if ((local && !remote) || (!local && remote))
+                       status = -EINVAL;
+
+               if (!status &&
+                   ((remote->ni_nodenum != local->nd_num) ||
+                    (remote->ni_ipv4_port != local->nd_ipv4_port) ||
+                    (remote->ni_ipv4_address != local->nd_ipv4_address)))
+                       status = -EINVAL;
+
+               if (status) {
+                       if (remote && !local)
+                               mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
+                                    "registered in joining node %d but not in "
+                                    "local node %d\n", qn->qn_domain,
+                                    remote->ni_nodenum,
+                                    &(remote->ni_ipv4_address),
+                                    ntohs(remote->ni_ipv4_port),
+                                    qn->qn_nodenum, dlm->node_num);
+                       if (local && !remote)
+                               mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
+                                    "registered in local node %d but not in "
+                                    "joining node %d\n", qn->qn_domain,
+                                    local->nd_num, &(local->nd_ipv4_address),
+                                    ntohs(local->nd_ipv4_port),
+                                    dlm->node_num, qn->qn_nodenum);
+                       BUG_ON((!local && !remote));
+               }
+
+               if (local)
+                       o2nm_node_put(local);
+       }
+
+       return status;
+}
+
+static int dlm_send_nodeinfo(struct dlm_ctxt *dlm, unsigned long *node_map)
+{
+       struct dlm_query_nodeinfo *qn = NULL;
+       struct o2nm_node *node;
+       int ret = 0, status, count, i;
+
+       if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
+               goto bail;
+
+       qn = kzalloc(sizeof(struct dlm_query_nodeinfo), GFP_KERNEL);
+       if (!qn) {
+               ret = -ENOMEM;
+               mlog_errno(ret);
+               goto bail;
+       }
+
+       for (i = 0, count = 0; i < O2NM_MAX_NODES; ++i) {
+               node = o2nm_get_node_by_num(i);
+               if (!node)
+                       continue;
+               qn->qn_nodes[count].ni_nodenum = node->nd_num;
+               qn->qn_nodes[count].ni_ipv4_port = node->nd_ipv4_port;
+               qn->qn_nodes[count].ni_ipv4_address = node->nd_ipv4_address;
+               mlog(0, "Node %3d, %pI4:%u\n", node->nd_num,
+                    &(node->nd_ipv4_address), ntohs(node->nd_ipv4_port));
+               ++count;
+               o2nm_node_put(node);
+       }
+
+       qn->qn_nodenum = dlm->node_num;
+       qn->qn_numnodes = count;
+       qn->qn_namelen = strlen(dlm->name);
+       memcpy(qn->qn_domain, dlm->name, qn->qn_namelen);
+
+       i = -1;
+       while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
+                                 i + 1)) < O2NM_MAX_NODES) {
+               if (i == dlm->node_num)
+                       continue;
+
+               mlog(0, "Sending nodeinfo to node %d\n", i);
+
+               ret = o2net_send_message(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
+                                        qn, sizeof(struct dlm_query_nodeinfo),
+                                        i, &status);
+               if (ret >= 0)
+                       ret = status;
+               if (ret) {
+                       mlog(ML_ERROR, "node mismatch %d, node %d\n", ret, i);
+                       break;
+               }
+       }
+
+bail:
+       kfree(qn);
+       return ret;
+}
+
+static int dlm_query_nodeinfo_handler(struct o2net_msg *msg, u32 len,
+                                     void *data, void **ret_data)
+{
+       struct dlm_query_nodeinfo *qn;
+       struct dlm_ctxt *dlm = NULL;
+       int locked = 0, status = -EINVAL;
+
+       qn = (struct dlm_query_nodeinfo *) msg->buf;
+
+       mlog(0, "Node %u queries nodes on domain %s\n", qn->qn_nodenum,
+            qn->qn_domain);
+
+       spin_lock(&dlm_domain_lock);
+       dlm = __dlm_lookup_domain_full(qn->qn_domain, qn->qn_namelen);
+       if (!dlm) {
+               mlog(ML_ERROR, "Node %d queried nodes on domain %s before "
+                    "join domain\n", qn->qn_nodenum, qn->qn_domain);
+               goto bail;
+       }
+
+       spin_lock(&dlm->spinlock);
+       locked = 1;
+       if (dlm->joining_node != qn->qn_nodenum) {
+               mlog(ML_ERROR, "Node %d queried nodes on domain %s but "
+                    "joining node is %d\n", qn->qn_nodenum, qn->qn_domain,
+                    dlm->joining_node);
+               goto bail;
+       }
+
+       /* Support for node query was added in 1.1 */
+       if (dlm->dlm_locking_proto.pv_major == 1 &&
+           dlm->dlm_locking_proto.pv_minor == 0) {
+               mlog(ML_ERROR, "Node %d queried nodes on domain %s "
+                    "but active dlm protocol is %d.%d\n", qn->qn_nodenum,
+                    qn->qn_domain, dlm->dlm_locking_proto.pv_major,
+                    dlm->dlm_locking_proto.pv_minor);
+               goto bail;
+       }
+
+       status = dlm_match_nodes(dlm, qn);
+
+bail:
+       if (locked)
+               spin_unlock(&dlm->spinlock);
+       spin_unlock(&dlm_domain_lock);
+
+       return status;
+}
+
 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
                                   void **ret_data)
 {
@@ -1241,6 +1611,20 @@ static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
        set_bit(dlm->node_num, dlm->domain_map);
        spin_unlock(&dlm->spinlock);
 
+       /* Support for global heartbeat and node info was added in 1.1 */
+       if (dlm_protocol.pv_major > 1 || dlm_protocol.pv_minor > 0) {
+               status = dlm_send_nodeinfo(dlm, ctxt->yes_resp_map);
+               if (status) {
+                       mlog_errno(status);
+                       goto bail;
+               }
+               status = dlm_send_regions(dlm, ctxt->yes_resp_map);
+               if (status) {
+                       mlog_errno(status);
+                       goto bail;
+               }
+       }
+
        dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
 
        /* Joined state *must* be set before the joining node
@@ -1807,7 +2191,21 @@ static int dlm_register_net_handlers(void)
                                        sizeof(struct dlm_cancel_join),
                                        dlm_cancel_join_handler,
                                        NULL, NULL, &dlm_join_handlers);
+       if (status)
+               goto bail;
+
+       status = o2net_register_handler(DLM_QUERY_REGION, DLM_MOD_KEY,
+                                       sizeof(struct dlm_query_region),
+                                       dlm_query_region_handler,
+                                       NULL, NULL, &dlm_join_handlers);
 
+       if (status)
+               goto bail;
+
+       status = o2net_register_handler(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
+                                       sizeof(struct dlm_query_nodeinfo),
+                                       dlm_query_nodeinfo_handler,
+                                       NULL, NULL, &dlm_join_handlers);
 bail:
        if (status < 0)
                dlm_unregister_net_handlers();