From: Gregoire Pichon Date: Thu, 10 Nov 2016 15:51:13 +0000 (-0500) Subject: staging: lustre: mdc: manage number of modify RPCs in flight X-Git-Url: https://git.karo-electronics.de/?a=commitdiff_plain;h=7cb15d044867c83b27ed434fe7002797846002c0;p=linux-beck.git staging: lustre: mdc: manage number of modify RPCs in flight This patch is the main client part of a new feature that supports multiple modify metadata RPCs in parallel. Its goal is to improve metadata operations performance of a single client, while maintening the consistency of MDT reply reconstruction and MDT recovery mechanisms. It allows to manage the number of modify RPCs in flight within the client obd structure and to assign a virtual index (the tag) to each modify RPC to help server side cleaning of reply data. The mdc component uses this feature to send multiple modify RPCs in parallel. Signed-off-by: Gregoire Pichon Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-5319 Reviewed-on: http://review.whamcloud.com/14374 Reviewed-by: Andreas Dilger Reviewed-by: Alex Zhuravlev Reviewed-by: Oleg Drokin Signed-off-by: James Simmons Signed-off-by: Greg Kroah-Hartman --- diff --git a/drivers/staging/lustre/lustre/fid/fid_request.c b/drivers/staging/lustre/lustre/fid/fid_request.c index 1148b9ab5e11..999f250ceed0 100644 --- a/drivers/staging/lustre/lustre/fid/fid_request.c +++ b/drivers/staging/lustre/lustre/fid/fid_request.c @@ -112,11 +112,7 @@ static int seq_client_rpc(struct lu_client_seq *seq, ptlrpc_at_set_req_timeout(req); - if (opc != SEQ_ALLOC_SUPER && seq->lcs_type == LUSTRE_SEQ_METADATA) - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); rc = ptlrpc_queue_wait(req); - if (opc != SEQ_ALLOC_SUPER && seq->lcs_type == LUSTRE_SEQ_METADATA) - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); if (rc) goto out_req; diff --git a/drivers/staging/lustre/lustre/include/lustre_mdc.h b/drivers/staging/lustre/lustre/include/lustre_mdc.h index 92a5c0fe2b67..198ceb0c66f9 100644 --- a/drivers/staging/lustre/lustre/include/lustre_mdc.h +++ b/drivers/staging/lustre/lustre/include/lustre_mdc.h @@ -156,6 +156,30 @@ static inline void mdc_put_rpc_lock(struct mdc_rpc_lock *lck, mutex_unlock(&lck->rpcl_mutex); } +static inline void mdc_get_mod_rpc_slot(struct ptlrpc_request *req, + struct lookup_intent *it) +{ + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + u32 opc; + u16 tag; + + opc = lustre_msg_get_opc(req->rq_reqmsg); + tag = obd_get_mod_rpc_slot(cli, opc, it); + lustre_msg_set_tag(req->rq_reqmsg, tag); +} + +static inline void mdc_put_mod_rpc_slot(struct ptlrpc_request *req, + struct lookup_intent *it) +{ + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + u32 opc; + u16 tag; + + opc = lustre_msg_get_opc(req->rq_reqmsg); + tag = lustre_msg_get_tag(req->rq_reqmsg); + obd_put_mod_rpc_slot(cli, opc, it, tag); +} + /** * Update the maximum possible easize. * diff --git a/drivers/staging/lustre/lustre/include/obd.h b/drivers/staging/lustre/lustre/include/obd.h index 0f6e8f66e6cd..60efaaa726b1 100644 --- a/drivers/staging/lustre/lustre/include/obd.h +++ b/drivers/staging/lustre/lustre/include/obd.h @@ -263,14 +263,17 @@ struct client_obd { wait_queue_head_t cl_destroy_waitq; struct mdc_rpc_lock *cl_rpc_lock; - struct mdc_rpc_lock *cl_close_lock; /* modify rpcs in flight * currently used for metadata only */ spinlock_t cl_mod_rpcs_lock; u16 cl_max_mod_rpcs_in_flight; - + u16 cl_mod_rpcs_in_flight; + u16 cl_close_rpcs_in_flight; + wait_queue_head_t cl_mod_rpcs_waitq; + unsigned long *cl_mod_tag_bitmap; + struct obd_histogram cl_mod_rpcs_hist; /* mgc datastruct */ atomic_t cl_mgc_refcount; diff --git a/drivers/staging/lustre/lustre/include/obd_class.h b/drivers/staging/lustre/lustre/include/obd_class.h index 01cd4893001f..9099b513f036 100644 --- a/drivers/staging/lustre/lustre/include/obd_class.h +++ b/drivers/staging/lustre/lustre/include/obd_class.h @@ -101,6 +101,12 @@ void obd_put_request_slot(struct client_obd *cli); __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli); int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max); int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, u16 max); +int obd_mod_rpc_stats_seq_show(struct client_obd *cli, struct seq_file *seq); + +u16 obd_get_mod_rpc_slot(struct client_obd *cli, u32 opc, + struct lookup_intent *it); +void obd_put_mod_rpc_slot(struct client_obd *cli, u32 opc, + struct lookup_intent *it, u16 tag); struct llog_handle; struct llog_rec_hdr; diff --git a/drivers/staging/lustre/lustre/ldlm/ldlm_lib.c b/drivers/staging/lustre/lustre/ldlm/ldlm_lib.c index 06d3cc683917..9be01426c955 100644 --- a/drivers/staging/lustre/lustre/ldlm/ldlm_lib.c +++ b/drivers/staging/lustre/lustre/ldlm/ldlm_lib.c @@ -375,6 +375,25 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg) } else { cli->cl_max_rpcs_in_flight = OBD_MAX_RIF_DEFAULT; } + + spin_lock_init(&cli->cl_mod_rpcs_lock); + spin_lock_init(&cli->cl_mod_rpcs_hist.oh_lock); + cli->cl_max_mod_rpcs_in_flight = 0; + cli->cl_mod_rpcs_in_flight = 0; + cli->cl_close_rpcs_in_flight = 0; + init_waitqueue_head(&cli->cl_mod_rpcs_waitq); + cli->cl_mod_tag_bitmap = NULL; + + if (connect_op == MDS_CONNECT) { + cli->cl_max_mod_rpcs_in_flight = cli->cl_max_rpcs_in_flight - 1; + cli->cl_mod_tag_bitmap = kcalloc(BITS_TO_LONGS(OBD_MAX_RIF_MAX), + sizeof(long), GFP_NOFS); + if (!cli->cl_mod_tag_bitmap) { + rc = -ENOMEM; + goto err; + } + } + rc = ldlm_get_ref(); if (rc) { CERROR("ldlm_get_ref failed: %d\n", rc); @@ -434,12 +453,16 @@ err_import: err_ldlm: ldlm_put_ref(); err: + kfree(cli->cl_mod_tag_bitmap); + cli->cl_mod_tag_bitmap = NULL; return rc; } EXPORT_SYMBOL(client_obd_setup); int client_obd_cleanup(struct obd_device *obddev) { + struct client_obd *cli = &obddev->u.cli; + ldlm_namespace_free_post(obddev->obd_namespace); obddev->obd_namespace = NULL; @@ -447,6 +470,10 @@ int client_obd_cleanup(struct obd_device *obddev) LASSERT(!obddev->u.cli.cl_import); ldlm_put_ref(); + + kfree(cli->cl_mod_tag_bitmap); + cli->cl_mod_tag_bitmap = NULL; + return 0; } EXPORT_SYMBOL(client_obd_cleanup); @@ -461,6 +488,7 @@ int client_connect_import(const struct lu_env *env, struct obd_import *imp = cli->cl_import; struct obd_connect_data *ocd; struct lustre_handle conn = { 0 }; + bool is_mdc = false; int rc; *exp = NULL; @@ -487,6 +515,10 @@ int client_connect_import(const struct lu_env *env, ocd = &imp->imp_connect_data; if (data) { *ocd = *data; + is_mdc = !strncmp(imp->imp_obd->obd_type->typ_name, + LUSTRE_MDC_NAME, 3); + if (is_mdc) + data->ocd_connect_flags |= OBD_CONNECT_MULTIMODRPCS; imp->imp_connect_flags_orig = data->ocd_connect_flags; } @@ -502,6 +534,11 @@ int client_connect_import(const struct lu_env *env, ocd->ocd_connect_flags, "old %#llx, new %#llx\n", data->ocd_connect_flags, ocd->ocd_connect_flags); data->ocd_connect_flags = ocd->ocd_connect_flags; + /* clear the flag as it was not set and is not known + * by upper layers + */ + if (is_mdc) + data->ocd_connect_flags &= ~OBD_CONNECT_MULTIMODRPCS; } ptlrpc_pinger_add_import(imp); diff --git a/drivers/staging/lustre/lustre/mdc/lproc_mdc.c b/drivers/staging/lustre/lustre/mdc/lproc_mdc.c index 76b9afc9425b..9021c465c044 100644 --- a/drivers/staging/lustre/lustre/mdc/lproc_mdc.c +++ b/drivers/staging/lustre/lustre/mdc/lproc_mdc.c @@ -146,6 +146,27 @@ static ssize_t max_mod_rpcs_in_flight_store(struct kobject *kobj, } LUSTRE_RW_ATTR(max_mod_rpcs_in_flight); +static int mdc_rpc_stats_seq_show(struct seq_file *seq, void *v) +{ + struct obd_device *dev = seq->private; + + return obd_mod_rpc_stats_seq_show(&dev->u.cli, seq); +} + +static ssize_t mdc_rpc_stats_seq_write(struct file *file, + const char __user *buf, + size_t len, loff_t *off) +{ + struct seq_file *seq = file->private_data; + struct obd_device *dev = seq->private; + struct client_obd *cli = &dev->u.cli; + + lprocfs_oh_clear(&cli->cl_mod_rpcs_hist); + + return len; +} +LPROC_SEQ_FOPS(mdc_rpc_stats); + LPROC_SEQ_FOPS_WR_ONLY(mdc, ping); LPROC_SEQ_FOPS_RO_TYPE(mdc, connect_flags); @@ -185,6 +206,8 @@ static struct lprocfs_vars lprocfs_mdc_obd_vars[] = { { "import", &mdc_import_fops, NULL, 0 }, { "state", &mdc_state_fops, NULL, 0 }, { "pinger_recov", &mdc_pinger_recov_fops, NULL, 0 }, + { .name = "rpc_stats", + .fops = &mdc_rpc_stats_fops }, { NULL } }; diff --git a/drivers/staging/lustre/lustre/mdc/mdc_locks.c b/drivers/staging/lustre/lustre/mdc/mdc_locks.c index b9ca14005800..42a128faf03c 100644 --- a/drivers/staging/lustre/lustre/mdc/mdc_locks.c +++ b/drivers/staging/lustre/lustre/mdc/mdc_locks.c @@ -766,15 +766,16 @@ resend: req->rq_sent = ktime_get_real_seconds() + resends; } - /* It is important to obtain rpc_lock first (if applicable), so that - * threads that are serialised with rpc_lock are not polluting our - * rpcs in flight counter. We do not do flock request limiting, though + /* It is important to obtain modify RPC slot first (if applicable), so + * that threads that are waiting for a modify RPC slot are not polluting + * our rpcs in flight counter. + * We do not do flock request limiting, though */ if (it) { - mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); + mdc_get_mod_rpc_slot(req, it); rc = obd_get_request_slot(&obddev->u.cli); if (rc != 0) { - mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); + mdc_put_mod_rpc_slot(req, it); mdc_clear_replay_flag(req, 0); ptlrpc_req_finished(req); return rc; @@ -801,7 +802,7 @@ resend: } obd_put_request_slot(&obddev->u.cli); - mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); + mdc_put_mod_rpc_slot(req, it); if (rc < 0) { CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n", diff --git a/drivers/staging/lustre/lustre/mdc/mdc_reint.c b/drivers/staging/lustre/lustre/mdc/mdc_reint.c index 1847e5a47022..b551c5711db9 100644 --- a/drivers/staging/lustre/lustre/mdc/mdc_reint.c +++ b/drivers/staging/lustre/lustre/mdc/mdc_reint.c @@ -40,17 +40,15 @@ #include "../include/lustre_fid.h" /* mdc_setattr does its own semaphore handling */ -static int mdc_reint(struct ptlrpc_request *request, - struct mdc_rpc_lock *rpc_lock, - int level) +static int mdc_reint(struct ptlrpc_request *request, int level) { int rc; request->rq_send_state = level; - mdc_get_rpc_lock(rpc_lock, NULL); + mdc_get_mod_rpc_slot(request, NULL); rc = ptlrpc_queue_wait(request); - mdc_put_rpc_lock(rpc_lock, NULL); + mdc_put_mod_rpc_slot(request, NULL); if (rc) CDEBUG(D_INFO, "error in handling %d\n", rc); else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY)) @@ -103,8 +101,6 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data, { LIST_HEAD(cancels); struct ptlrpc_request *req; - struct mdc_rpc_lock *rpc_lock; - struct obd_device *obd = exp->exp_obd; int count = 0, rc; __u64 bits; @@ -131,8 +127,6 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data, return rc; } - rpc_lock = obd->u.cli.cl_rpc_lock; - if (op_data->op_attr.ia_valid & (ATTR_MTIME | ATTR_CTIME)) CDEBUG(D_INODE, "setting mtime %ld, ctime %ld\n", LTIME_S(op_data->op_attr.ia_mtime), @@ -141,7 +135,7 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data, ptlrpc_request_set_replen(req); - rc = mdc_reint(req, rpc_lock, LUSTRE_IMP_FULL); + rc = mdc_reint(req, LUSTRE_IMP_FULL); if (rc == -ERESTARTSYS) rc = 0; @@ -220,7 +214,7 @@ rebuild: } level = LUSTRE_IMP_FULL; resend: - rc = mdc_reint(req, exp->exp_obd->u.cli.cl_rpc_lock, level); + rc = mdc_reint(req, level); /* Resend if we were told to. */ if (rc == -ERESTARTSYS) { @@ -292,7 +286,7 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data, *request = req; - rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL); + rc = mdc_reint(req, LUSTRE_IMP_FULL); if (rc == -ERESTARTSYS) rc = 0; return rc; @@ -302,7 +296,6 @@ int mdc_link(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { LIST_HEAD(cancels); - struct obd_device *obd = exp->exp_obd; struct ptlrpc_request *req; int count = 0, rc; @@ -334,7 +327,7 @@ int mdc_link(struct obd_export *exp, struct md_op_data *op_data, mdc_link_pack(req, op_data); ptlrpc_request_set_replen(req); - rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL); + rc = mdc_reint(req, LUSTRE_IMP_FULL); *request = req; if (rc == -ERESTARTSYS) rc = 0; @@ -398,7 +391,7 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data, obd->u.cli.cl_default_mds_easize); ptlrpc_request_set_replen(req); - rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL); + rc = mdc_reint(req, LUSTRE_IMP_FULL); *request = req; if (rc == -ERESTARTSYS) rc = 0; diff --git a/drivers/staging/lustre/lustre/mdc/mdc_request.c b/drivers/staging/lustre/lustre/mdc/mdc_request.c index 6bed5ec42fb6..f9755ddc8e0b 100644 --- a/drivers/staging/lustre/lustre/mdc/mdc_request.c +++ b/drivers/staging/lustre/lustre/mdc/mdc_request.c @@ -327,12 +327,12 @@ static int mdc_xattr_common(struct obd_export *exp, /* make rpc */ if (opcode == MDS_REINT) - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + mdc_get_mod_rpc_slot(req, NULL); rc = ptlrpc_queue_wait(req); if (opcode == MDS_REINT) - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + mdc_put_mod_rpc_slot(req, NULL); if (rc) ptlrpc_req_finished(req); @@ -777,9 +777,9 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, ptlrpc_request_set_replen(req); - mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL); + mdc_get_mod_rpc_slot(req, NULL); rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL); + mdc_put_mod_rpc_slot(req, NULL); if (!req->rq_repmsg) { CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req, @@ -1495,9 +1495,9 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + mdc_get_mod_rpc_slot(req, NULL); rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + mdc_put_mod_rpc_slot(req, NULL); out: ptlrpc_req_finished(req); return rc; @@ -1675,9 +1675,9 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + mdc_get_mod_rpc_slot(req, NULL); rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + mdc_put_mod_rpc_slot(req, NULL); out: ptlrpc_req_finished(req); return rc; @@ -1740,9 +1740,9 @@ static int mdc_ioc_hsm_request(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + mdc_get_mod_rpc_slot(req, NULL); rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + mdc_put_mod_rpc_slot(req, NULL); out: ptlrpc_req_finished(req); return rc; @@ -2587,29 +2587,17 @@ static void mdc_llog_finish(struct obd_device *obd) static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) { - struct client_obd *cli = &obd->u.cli; struct lprocfs_static_vars lvars = { NULL }; int rc; - cli->cl_rpc_lock = kzalloc(sizeof(*cli->cl_rpc_lock), GFP_NOFS); - if (!cli->cl_rpc_lock) - return -ENOMEM; - mdc_init_rpc_lock(cli->cl_rpc_lock); - rc = ptlrpcd_addref(); if (rc < 0) - goto err_rpc_lock; - - cli->cl_close_lock = kzalloc(sizeof(*cli->cl_close_lock), GFP_NOFS); - if (!cli->cl_close_lock) { - rc = -ENOMEM; - goto err_ptlrpcd_decref; - } - mdc_init_rpc_lock(cli->cl_close_lock); + return rc; rc = client_obd_setup(obd, cfg); if (rc) - goto err_close_lock; + goto err_ptlrpcd_decref; + lprocfs_mdc_init_vars(&lvars); lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars); sptlrpc_lprocfs_cliobd_attach(obd); @@ -2626,17 +2614,10 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) return rc; } - spin_lock_init(&cli->cl_mod_rpcs_lock); - cli->cl_max_mod_rpcs_in_flight = OBD_MAX_RIF_DEFAULT - 1; - return rc; -err_close_lock: - kfree(cli->cl_close_lock); err_ptlrpcd_decref: ptlrpcd_decref(); -err_rpc_lock: - kfree(cli->cl_rpc_lock); return rc; } @@ -2677,11 +2658,6 @@ static int mdc_precleanup(struct obd_device *obd) static int mdc_cleanup(struct obd_device *obd) { - struct client_obd *cli = &obd->u.cli; - - kfree(cli->cl_rpc_lock); - kfree(cli->cl_close_lock); - ptlrpcd_decref(); return client_obd_cleanup(obd); diff --git a/drivers/staging/lustre/lustre/obdclass/genops.c b/drivers/staging/lustre/lustre/obdclass/genops.c index 62e66361584a..438d619059a9 100644 --- a/drivers/staging/lustre/lustre/obdclass/genops.c +++ b/drivers/staging/lustre/lustre/obdclass/genops.c @@ -1461,6 +1461,7 @@ int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max) { struct obd_connect_data *ocd; u16 maxmodrpcs; + u16 prev; if (max > OBD_MAX_RIF_MAX || max < 1) return -ERANGE; @@ -1486,10 +1487,178 @@ int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max) return -ERANGE; } + spin_lock(&cli->cl_mod_rpcs_lock); + + prev = cli->cl_max_mod_rpcs_in_flight; cli->cl_max_mod_rpcs_in_flight = max; - /* will have to wakeup waiters if max has been increased */ + /* wakeup waiters if limit has been increased */ + if (cli->cl_max_mod_rpcs_in_flight > prev) + wake_up(&cli->cl_mod_rpcs_waitq); + + spin_unlock(&cli->cl_mod_rpcs_lock); return 0; } EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight); + +#define pct(a, b) (b ? (a * 100) / b : 0) + +int obd_mod_rpc_stats_seq_show(struct client_obd *cli, struct seq_file *seq) +{ + unsigned long mod_tot = 0, mod_cum; + struct timespec64 now; + int i; + + ktime_get_real_ts64(&now); + + spin_lock(&cli->cl_mod_rpcs_lock); + + seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n", + (s64)now.tv_sec, (unsigned long)now.tv_nsec); + seq_printf(seq, "modify_RPCs_in_flight: %hu\n", + cli->cl_mod_rpcs_in_flight); + + seq_puts(seq, "\n\t\t\tmodify\n"); + seq_puts(seq, "rpcs in flight rpcs %% cum %%\n"); + + mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist); + + mod_cum = 0; + for (i = 0; i < OBD_HIST_MAX; i++) { + unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i]; + + mod_cum += mod; + seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n", + i, mod, pct(mod, mod_tot), + pct(mod_cum, mod_tot)); + if (mod_cum == mod_tot) + break; + } + + spin_unlock(&cli->cl_mod_rpcs_lock); + + return 0; +} +EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show); +#undef pct + +/* + * The number of modify RPCs sent in parallel is limited + * because the server has a finite number of slots per client to + * store request result and ensure reply reconstruction when needed. + * On the client, this limit is stored in cl_max_mod_rpcs_in_flight + * that takes into account server limit and cl_max_rpcs_in_flight + * value. + * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462), + * one close request is allowed above the maximum. + */ +static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli, + bool close_req) +{ + bool avail; + + /* A slot is available if + * - number of modify RPCs in flight is less than the max + * - it's a close RPC and no other close request is in flight + */ + avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight || + (close_req && !cli->cl_close_rpcs_in_flight); + + return avail; +} + +static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli, + bool close_req) +{ + bool avail; + + spin_lock(&cli->cl_mod_rpcs_lock); + avail = obd_mod_rpc_slot_avail_locked(cli, close_req); + spin_unlock(&cli->cl_mod_rpcs_lock); + return avail; +} + +/* Get a modify RPC slot from the obd client @cli according + * to the kind of operation @opc that is going to be sent + * and the intent @it of the operation if it applies. + * If the maximum number of modify RPCs in flight is reached + * the thread is put to sleep. + * Returns the tag to be set in the request message. Tag 0 + * is reserved for non-modifying requests. + */ +u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc, + struct lookup_intent *it) +{ + struct l_wait_info lwi = LWI_INTR(NULL, NULL); + bool close_req = false; + u16 i, max; + + /* read-only metadata RPCs don't consume a slot on MDT + * for reply reconstruction + */ + if (it && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP || + it->it_op == IT_LAYOUT || it->it_op == IT_READDIR)) + return 0; + + if (opc == MDS_CLOSE) + close_req = true; + + do { + spin_lock(&cli->cl_mod_rpcs_lock); + max = cli->cl_max_mod_rpcs_in_flight; + if (obd_mod_rpc_slot_avail_locked(cli, close_req)) { + /* there is a slot available */ + cli->cl_mod_rpcs_in_flight++; + if (close_req) + cli->cl_close_rpcs_in_flight++; + lprocfs_oh_tally(&cli->cl_mod_rpcs_hist, + cli->cl_mod_rpcs_in_flight); + /* find a free tag */ + i = find_first_zero_bit(cli->cl_mod_tag_bitmap, + max + 1); + LASSERT(i < OBD_MAX_RIF_MAX); + LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap)); + spin_unlock(&cli->cl_mod_rpcs_lock); + /* tag 0 is reserved for non-modify RPCs */ + return i + 1; + } + spin_unlock(&cli->cl_mod_rpcs_lock); + + CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot opc %u, max %hu\n", + cli->cl_import->imp_obd->obd_name, opc, max); + + l_wait_event(cli->cl_mod_rpcs_waitq, + obd_mod_rpc_slot_avail(cli, close_req), &lwi); + } while (true); +} +EXPORT_SYMBOL(obd_get_mod_rpc_slot); + +/* + * Put a modify RPC slot from the obd client @cli according + * to the kind of operation @opc that has been sent and the + * intent @it of the operation if it applies. + */ +void obd_put_mod_rpc_slot(struct client_obd *cli, u32 opc, + struct lookup_intent *it, u16 tag) +{ + bool close_req = false; + + if (it && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP || + it->it_op == IT_LAYOUT || it->it_op == IT_READDIR)) + return; + + if (opc == MDS_CLOSE) + close_req = true; + + spin_lock(&cli->cl_mod_rpcs_lock); + cli->cl_mod_rpcs_in_flight--; + if (close_req) + cli->cl_close_rpcs_in_flight--; + /* release the tag in the bitmap */ + LASSERT(tag - 1 < OBD_MAX_RIF_MAX); + LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0); + spin_unlock(&cli->cl_mod_rpcs_lock); + wake_up(&cli->cl_mod_rpcs_waitq); +} +EXPORT_SYMBOL(obd_put_mod_rpc_slot);