return cap_str[i];
}
-/*
- * Cap reservations
- *
- * Maintain a global pool of preallocated struct ceph_caps, referenced
- * by struct ceph_caps_reservations. This ensures that we preallocate
- * memory needed to successfully process an MDS response. (If an MDS
- * sends us cap information and we fail to process it, we will have
- * problems due to the client and MDS being out of sync.)
- *
- * Reservations are 'owned' by a ceph_cap_reservation context.
- */
-static spinlock_t caps_list_lock;
-static struct list_head caps_list; /* unused (reserved or unreserved) */
-static int caps_total_count; /* total caps allocated */
-static int caps_use_count; /* in use */
-static int caps_reserve_count; /* unused, reserved */
-static int caps_avail_count; /* unused, unreserved */
-static int caps_min_count; /* keep at least this many (unreserved) */
-
-void __init ceph_caps_init(void)
+void ceph_caps_init(struct ceph_mds_client *mdsc)
{
- INIT_LIST_HEAD(&caps_list);
- spin_lock_init(&caps_list_lock);
+ INIT_LIST_HEAD(&mdsc->caps_list);
+ spin_lock_init(&mdsc->caps_list_lock);
}
-void ceph_caps_finalize(void)
+void ceph_caps_finalize(struct ceph_mds_client *mdsc)
{
struct ceph_cap *cap;
- spin_lock(&caps_list_lock);
- while (!list_empty(&caps_list)) {
- cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
+ spin_lock(&mdsc->caps_list_lock);
+ while (!list_empty(&mdsc->caps_list)) {
+ cap = list_first_entry(&mdsc->caps_list,
+ struct ceph_cap, caps_item);
list_del(&cap->caps_item);
kmem_cache_free(ceph_cap_cachep, cap);
}
- caps_total_count = 0;
- caps_avail_count = 0;
- caps_use_count = 0;
- caps_reserve_count = 0;
- caps_min_count = 0;
- spin_unlock(&caps_list_lock);
+ mdsc->caps_total_count = 0;
+ mdsc->caps_avail_count = 0;
+ mdsc->caps_use_count = 0;
+ mdsc->caps_reserve_count = 0;
+ mdsc->caps_min_count = 0;
+ spin_unlock(&mdsc->caps_list_lock);
}
-void ceph_adjust_min_caps(int delta)
+void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
{
- spin_lock(&caps_list_lock);
- caps_min_count += delta;
- BUG_ON(caps_min_count < 0);
- spin_unlock(&caps_list_lock);
+ spin_lock(&mdsc->caps_list_lock);
+ mdsc->caps_min_count += delta;
+ BUG_ON(mdsc->caps_min_count < 0);
+ spin_unlock(&mdsc->caps_list_lock);
}
-int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
+int ceph_reserve_caps(struct ceph_mds_client *mdsc,
+ struct ceph_cap_reservation *ctx, int need)
{
int i;
struct ceph_cap *cap;
dout("reserve caps ctx=%p need=%d\n", ctx, need);
/* first reserve any caps that are already allocated */
- spin_lock(&caps_list_lock);
- if (caps_avail_count >= need)
+ spin_lock(&mdsc->caps_list_lock);
+ if (mdsc->caps_avail_count >= need)
have = need;
else
- have = caps_avail_count;
- caps_avail_count -= have;
- caps_reserve_count += have;
- BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
- caps_avail_count);
- spin_unlock(&caps_list_lock);
+ have = mdsc->caps_avail_count;
+ mdsc->caps_avail_count -= have;
+ mdsc->caps_reserve_count += have;
+ BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+ mdsc->caps_reserve_count +
+ mdsc->caps_avail_count);
+ spin_unlock(&mdsc->caps_list_lock);
for (i = have; i < need; i++) {
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
}
BUG_ON(have + alloc != need);
- spin_lock(&caps_list_lock);
- caps_total_count += alloc;
- caps_reserve_count += alloc;
- list_splice(&newcaps, &caps_list);
+ spin_lock(&mdsc->caps_list_lock);
+ mdsc->caps_total_count += alloc;
+ mdsc->caps_reserve_count += alloc;
+ list_splice(&newcaps, &mdsc->caps_list);
- BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
- caps_avail_count);
- spin_unlock(&caps_list_lock);
+ BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+ mdsc->caps_reserve_count +
+ mdsc->caps_avail_count);
+ spin_unlock(&mdsc->caps_list_lock);
ctx->count = need;
dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
- ctx, caps_total_count, caps_use_count, caps_reserve_count,
- caps_avail_count);
+ ctx, mdsc->caps_total_count, mdsc->caps_use_count,
+ mdsc->caps_reserve_count, mdsc->caps_avail_count);
return 0;
out_alloc_count:
return ret;
}
-int ceph_unreserve_caps(struct ceph_cap_reservation *ctx)
+int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
+ struct ceph_cap_reservation *ctx)
{
dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
if (ctx->count) {
- spin_lock(&caps_list_lock);
- BUG_ON(caps_reserve_count < ctx->count);
- caps_reserve_count -= ctx->count;
- caps_avail_count += ctx->count;
+ spin_lock(&mdsc->caps_list_lock);
+ BUG_ON(mdsc->caps_reserve_count < ctx->count);
+ mdsc->caps_reserve_count -= ctx->count;
+ mdsc->caps_avail_count += ctx->count;
ctx->count = 0;
dout("unreserve caps %d = %d used + %d resv + %d avail\n",
- caps_total_count, caps_use_count, caps_reserve_count,
- caps_avail_count);
- BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
- caps_avail_count);
- spin_unlock(&caps_list_lock);
+ mdsc->caps_total_count, mdsc->caps_use_count,
+ mdsc->caps_reserve_count, mdsc->caps_avail_count);
+ BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+ mdsc->caps_reserve_count +
+ mdsc->caps_avail_count);
+ spin_unlock(&mdsc->caps_list_lock);
}
return 0;
}
-static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
+static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
+ struct ceph_cap_reservation *ctx)
{
struct ceph_cap *cap = NULL;
if (!ctx) {
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
if (cap) {
- caps_use_count++;
- caps_total_count++;
+ mdsc->caps_use_count++;
+ mdsc->caps_total_count++;
}
return cap;
}
- spin_lock(&caps_list_lock);
+ spin_lock(&mdsc->caps_list_lock);
dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
- ctx, ctx->count, caps_total_count, caps_use_count,
- caps_reserve_count, caps_avail_count);
+ ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
+ mdsc->caps_reserve_count, mdsc->caps_avail_count);
BUG_ON(!ctx->count);
- BUG_ON(ctx->count > caps_reserve_count);
- BUG_ON(list_empty(&caps_list));
+ BUG_ON(ctx->count > mdsc->caps_reserve_count);
+ BUG_ON(list_empty(&mdsc->caps_list));
ctx->count--;
- caps_reserve_count--;
- caps_use_count++;
+ mdsc->caps_reserve_count--;
+ mdsc->caps_use_count++;
- cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
+ cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
list_del(&cap->caps_item);
- BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
- caps_avail_count);
- spin_unlock(&caps_list_lock);
+ BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+ mdsc->caps_reserve_count + mdsc->caps_avail_count);
+ spin_unlock(&mdsc->caps_list_lock);
return cap;
}
-void ceph_put_cap(struct ceph_cap *cap)
+void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
{
- spin_lock(&caps_list_lock);
+ spin_lock(&mdsc->caps_list_lock);
dout("put_cap %p %d = %d used + %d resv + %d avail\n",
- cap, caps_total_count, caps_use_count,
- caps_reserve_count, caps_avail_count);
- caps_use_count--;
+ cap, mdsc->caps_total_count, mdsc->caps_use_count,
+ mdsc->caps_reserve_count, mdsc->caps_avail_count);
+ mdsc->caps_use_count--;
/*
* Keep some preallocated caps around (ceph_min_count), to
* avoid lots of free/alloc churn.
*/
- if (caps_avail_count >= caps_reserve_count + caps_min_count) {
- caps_total_count--;
+ if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
+ mdsc->caps_min_count) {
+ mdsc->caps_total_count--;
kmem_cache_free(ceph_cap_cachep, cap);
} else {
- caps_avail_count++;
- list_add(&cap->caps_item, &caps_list);
+ mdsc->caps_avail_count++;
+ list_add(&cap->caps_item, &mdsc->caps_list);
}
- BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
- caps_avail_count);
- spin_unlock(&caps_list_lock);
+ BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+ mdsc->caps_reserve_count + mdsc->caps_avail_count);
+ spin_unlock(&mdsc->caps_list_lock);
}
void ceph_reservation_status(struct ceph_client *client,
int *total, int *avail, int *used, int *reserved,
int *min)
{
+ struct ceph_mds_client *mdsc = &client->mdsc;
+
if (total)
- *total = caps_total_count;
+ *total = mdsc->caps_total_count;
if (avail)
- *avail = caps_avail_count;
+ *avail = mdsc->caps_avail_count;
if (used)
- *used = caps_use_count;
+ *used = mdsc->caps_use_count;
if (reserved)
- *reserved = caps_reserve_count;
+ *reserved = mdsc->caps_reserve_count;
if (min)
- *min = caps_min_count;
+ *min = mdsc->caps_min_count;
}
/*
return NULL;
}
+struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
+{
+ struct ceph_cap *cap;
+
+ spin_lock(&ci->vfs_inode.i_lock);
+ cap = __get_cap_for_mds(ci, mds);
+ spin_unlock(&ci->vfs_inode.i_lock);
+ return cap;
+}
+
/*
* Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1.
*/
new_cap = NULL;
} else {
spin_unlock(&inode->i_lock);
- new_cap = get_cap(caps_reservation);
+ new_cap = get_cap(mdsc, caps_reservation);
if (new_cap == NULL)
return -ENOMEM;
goto retry;
} else {
pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
realmino);
+ WARN_ON(!realm);
}
}
ci->i_auth_cap = NULL;
if (removed)
- ceph_put_cap(cap);
+ ceph_put_cap(mdsc, cap);
if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
struct ceph_snap_realm *realm = ci->i_snap_realm;
gid_t gid;
struct ceph_mds_session *session;
u64 xattr_version = 0;
+ struct ceph_buffer *xattr_blob = NULL;
int delayed = 0;
u64 flush_tid = 0;
int i;
gid = inode->i_gid;
mode = inode->i_mode;
- if (dropping & CEPH_CAP_XATTR_EXCL) {
+ if (flushing & CEPH_CAP_XATTR_EXCL) {
__ceph_build_xattrs_blob(ci);
- xattr_version = ci->i_xattrs.version + 1;
+ xattr_blob = ci->i_xattrs.blob;
+ xattr_version = ci->i_xattrs.version;
}
spin_unlock(&inode->i_lock);
ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
size, max_size, &mtime, &atime, time_warp_seq,
- uid, gid, mode,
- xattr_version,
- (flushing & CEPH_CAP_XATTR_EXCL) ? ci->i_xattrs.blob : NULL,
+ uid, gid, mode, xattr_version, xattr_blob,
follows);
if (ret < 0) {
dout("error sending cap msg, must requeue %p\n", inode);
&capsnap->mtime, &capsnap->atime,
capsnap->time_warp_seq,
capsnap->uid, capsnap->gid, capsnap->mode,
- 0, NULL,
+ capsnap->xattr_version, capsnap->xattr_blob,
capsnap->follows);
next_follows = capsnap->follows + 1;
ceph_cap_string(cap->issued),
ceph_cap_string(newcaps),
ceph_cap_string(revoking));
- if (revoking & CEPH_CAP_FILE_BUFFER)
+ if (revoking & used & CEPH_CAP_FILE_BUFFER)
writeback = 1; /* initiate writeback; will delay ack */
else if (revoking == CEPH_CAP_FILE_CACHE &&
(newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
* caller holds s_mutex
*/
static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
- struct ceph_mds_session *session)
+ struct ceph_mds_session *session,
+ int *open_target_sessions)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
ci->i_cap_exporting_mds = mds;
ci->i_cap_exporting_mseq = mseq;
ci->i_cap_exporting_issued = cap->issued;
+
+ /*
+ * make sure we have open sessions with all possible
+ * export targets, so that we get the matching IMPORT
+ */
+ *open_target_sessions = 1;
}
__ceph_remove_cap(cap);
}
u64 size, max_size;
u64 tid;
void *snaptrace;
+ size_t snaptrace_len;
+ void *flock;
+ u32 flock_len;
+ int open_target_sessions = 0;
dout("handle_caps from mds%d\n", mds);
if (msg->front.iov_len < sizeof(*h))
goto bad;
h = msg->front.iov_base;
- snaptrace = h + 1;
op = le32_to_cpu(h->op);
vino.ino = le64_to_cpu(h->ino);
vino.snap = CEPH_NOSNAP;
size = le64_to_cpu(h->size);
max_size = le64_to_cpu(h->max_size);
+ snaptrace = h + 1;
+ snaptrace_len = le32_to_cpu(h->snap_trace_len);
+
+ if (le16_to_cpu(msg->hdr.version) >= 2) {
+ void *p, *end;
+
+ p = snaptrace + snaptrace_len;
+ end = msg->front.iov_base + msg->front.iov_len;
+ ceph_decode_32_safe(&p, end, flock_len, bad);
+ flock = p;
+ } else {
+ flock = NULL;
+ flock_len = 0;
+ }
+
mutex_lock(&session->s_mutex);
session->s_seq++;
dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
goto done;
case CEPH_CAP_OP_EXPORT:
- handle_cap_export(inode, h, session);
+ handle_cap_export(inode, h, session, &open_target_sessions);
goto done;
case CEPH_CAP_OP_IMPORT:
handle_cap_import(mdsc, inode, h, session,
- snaptrace, le32_to_cpu(h->snap_trace_len));
+ snaptrace, snaptrace_len);
ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY,
session);
goto done_unlocked;
done_unlocked:
if (inode)
iput(inode);
+ if (open_target_sessions)
+ ceph_mdsc_open_export_target_sessions(mdsc, session);
return;
bad: