If a node is removed from a lockspace, and then added back before the
dlm is notified of the removal, the dlm will not detect the removal
and won't clear the old state from the node. This is fixed by using a
list of added nodes so the membership recovery can detect when a newly
added node is already in the member list.
Signed-off-by: David Teigland <teigland@redhat.com>
struct list_head list; /* space->members */
int nodeid;
int weight;
struct list_head list; /* space->members */
int nodeid;
int weight;
};
static struct configfs_group_operations clusters_ops = {
};
static struct configfs_group_operations clusters_ops = {
config_item_init_type_name(&nd->item, name, &node_type);
nd->nodeid = -1;
nd->weight = 1; /* default weight of 1 if none is set */
config_item_init_type_name(&nd->item, name, &node_type);
nd->nodeid = -1;
nd->weight = 1; /* default weight of 1 if none is set */
+ nd->new = 1; /* set to 0 once it's been read by dlm_nodeid_list() */
mutex_lock(&sp->members_lock);
list_add(&nd->list, &sp->members);
mutex_lock(&sp->members_lock);
list_add(&nd->list, &sp->members);
}
/* caller must free mem */
}
/* caller must free mem */
-int dlm_nodeid_list(char *lsname, int **ids_out)
+int dlm_nodeid_list(char *lsname, int **ids_out, int *ids_count_out,
+ int **new_out, int *new_count_out)
{
struct space *sp;
struct node *nd;
{
struct space *sp;
struct node *nd;
- int i = 0, rv = 0;
- int *ids;
+ int i = 0, rv = 0, ids_count = 0, new_count = 0;
+ int *ids, *new;
sp = get_space(lsname);
if (!sp)
sp = get_space(lsname);
if (!sp)
mutex_lock(&sp->members_lock);
if (!sp->members_count) {
mutex_lock(&sp->members_lock);
if (!sp->members_count) {
+ rv = -EINVAL;
+ printk(KERN_ERR "dlm: zero members_count\n");
- ids = kcalloc(sp->members_count, sizeof(int), GFP_KERNEL);
+ ids_count = sp->members_count;
+
+ ids = kcalloc(ids_count, sizeof(int), GFP_KERNEL);
if (!ids) {
rv = -ENOMEM;
goto out;
}
if (!ids) {
rv = -ENOMEM;
goto out;
}
- rv = sp->members_count;
- list_for_each_entry(nd, &sp->members, list)
+ list_for_each_entry(nd, &sp->members, list) {
+ if (nd->new)
+ new_count++;
+ }
+
+ if (ids_count != i)
+ printk(KERN_ERR "dlm: bad nodeid count %d %d\n", ids_count, i);
+
+ if (!new_count)
+ goto out_ids;
+
+ new = kcalloc(new_count, sizeof(int), GFP_KERNEL);
+ if (!new) {
+ kfree(ids);
+ rv = -ENOMEM;
+ goto out;
+ }
- if (rv != i)
- printk("bad nodeid count %d %d\n", rv, i);
+ i = 0;
+ list_for_each_entry(nd, &sp->members, list) {
+ if (nd->new) {
+ new[i++] = nd->nodeid;
+ nd->new = 0;
+ }
+ }
+ *new_count_out = new_count;
+ *new_out = new;
+ out_ids:
+ *ids_count_out = ids_count;
*ids_out = ids;
out:
mutex_unlock(&sp->members_lock);
*ids_out = ids;
out:
mutex_unlock(&sp->members_lock);
int dlm_config_init(void);
void dlm_config_exit(void);
int dlm_node_weight(char *lsname, int nodeid);
int dlm_config_init(void);
void dlm_config_exit(void);
int dlm_node_weight(char *lsname, int nodeid);
-int dlm_nodeid_list(char *lsname, int **ids_out);
+int dlm_nodeid_list(char *lsname, int **ids_out, int *ids_count_out,
+ int **new_out, int *new_count_out);
int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr);
int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid);
int dlm_our_nodeid(void);
int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr);
int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid);
int dlm_our_nodeid(void);
struct dlm_recover {
struct list_head list;
struct dlm_recover {
struct list_head list;
+ int *nodeids; /* nodeids of all members */
+ int *new; /* nodeids of new members */
+ int new_count;
+ /* Add an entry to ls_nodes_gone for members that were removed and
+ then added again, so that previous state for these nodes will be
+ cleared during recovery. */
+
+ for (i = 0; i < rv->new_count; i++) {
+ if (!dlm_is_member(ls, rv->new[i]))
+ continue;
+ log_debug(ls, "new nodeid %d is a re-added member", rv->new[i]);
+
+ memb = kzalloc(sizeof(struct dlm_member), GFP_KERNEL);
+ if (!memb)
+ return -ENOMEM;
+ memb->nodeid = rv->new[i];
+ list_add_tail(&memb->list, &ls->ls_nodes_gone);
+ neg++;
+ }
+
/* add new members to ls_nodes */
for (i = 0; i < rv->node_count; i++) {
/* add new members to ls_nodes */
for (i = 0; i < rv->node_count; i++) {
int dlm_ls_start(struct dlm_ls *ls)
{
struct dlm_recover *rv = NULL, *rv_old;
int dlm_ls_start(struct dlm_ls *ls)
{
struct dlm_recover *rv = NULL, *rv_old;
- int *ids = NULL;
- int error, count;
+ int *ids = NULL, *new = NULL;
+ int error, ids_count = 0, new_count = 0;
rv = kzalloc(sizeof(struct dlm_recover), GFP_KERNEL);
if (!rv)
return -ENOMEM;
rv = kzalloc(sizeof(struct dlm_recover), GFP_KERNEL);
if (!rv)
return -ENOMEM;
- error = count = dlm_nodeid_list(ls->ls_name, &ids);
- if (error <= 0)
+ error = dlm_nodeid_list(ls->ls_name, &ids, &ids_count,
+ &new, &new_count);
+ if (error < 0)
goto fail;
spin_lock(&ls->ls_recover_lock);
goto fail;
spin_lock(&ls->ls_recover_lock);
- rv->node_count = count;
+ rv->node_count = ids_count;
+ rv->new = new;
+ rv->new_count = new_count;
rv->seq = ++ls->ls_recover_seq;
rv_old = ls->ls_recover_args;
ls->ls_recover_args = rv;
spin_unlock(&ls->ls_recover_lock);
if (rv_old) {
rv->seq = ++ls->ls_recover_seq;
rv_old = ls->ls_recover_args;
ls->ls_recover_args = rv;
spin_unlock(&ls->ls_recover_lock);
if (rv_old) {
+ log_error(ls, "unused recovery %llx %d",
+ (unsigned long long)rv_old->seq, rv_old->node_count);
fail:
kfree(rv);
kfree(ids);
fail:
kfree(rv);
kfree(ids);
if (rv) {
ls_recover(ls, rv);
kfree(rv->nodeids);
if (rv) {
ls_recover(ls, rv);
kfree(rv->nodeids);