]> git.karo-electronics.de Git - linux-beck.git/commitdiff
lustre/recovery: free open/close request promptly
authorHongchao Zhang <hongchao.zhang@intel.com>
Sat, 1 Mar 2014 02:16:37 +0000 (21:16 -0500)
committerGreg Kroah-Hartman <gregkh@linuxfoundation.org>
Sat, 1 Mar 2014 03:11:20 +0000 (19:11 -0800)
- For the non-create open or committed open, the open request
  should be freed along with the close request as soon as the
  close done, despite that the transno of open/close is
  greater than the last committed transno known by client or not.

- Move the committed open request into another dedicated list,
  that will avoid scanning a huge replay list on receiving each
  reply (when there are many open files).

Signed-off-by: Niu Yawei <yawei.niu@intel.com>
Signed-off-by: Hongchao Zhang <hongchao.zhang@intel.com>
Reviewed-on: http://review.whamcloud.com/6665
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-2613
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
Signed-off-by: Oleg Drokin <oleg.drokin@intel.com>
Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
18 files changed:
drivers/staging/lustre/lustre/include/lustre/lustre_idl.h
drivers/staging/lustre/lustre/include/lustre_export.h
drivers/staging/lustre/lustre/include/lustre_import.h
drivers/staging/lustre/lustre/include/lustre_net.h
drivers/staging/lustre/lustre/include/obd.h
drivers/staging/lustre/lustre/include/obd_class.h
drivers/staging/lustre/lustre/llite/file.c
drivers/staging/lustre/lustre/llite/llite_lib.c
drivers/staging/lustre/lustre/lmv/lmv_obd.c
drivers/staging/lustre/lustre/mdc/mdc_internal.h
drivers/staging/lustre/lustre/mdc/mdc_locks.c
drivers/staging/lustre/lustre/mdc/mdc_reint.c
drivers/staging/lustre/lustre/mdc/mdc_request.c
drivers/staging/lustre/lustre/obdclass/genops.c
drivers/staging/lustre/lustre/obdclass/lprocfs_status.c
drivers/staging/lustre/lustre/ptlrpc/client.c
drivers/staging/lustre/lustre/ptlrpc/import.c
drivers/staging/lustre/lustre/ptlrpc/recover.c

index 4c70c060b15870c15b0cbf436bc7aedcc2239dc4..a55eebfda165abebb6530e688bc285910a617e4e 100644 (file)
@@ -1305,6 +1305,7 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
 #define OBD_CONNECT_SHORTIO     0x2000000000000ULL/* short io */
 #define OBD_CONNECT_PINGLESS   0x4000000000000ULL/* pings not required */
 #define OBD_CONNECT_FLOCK_DEAD 0x8000000000000ULL/* flock deadlock detection */
+#define OBD_CONNECT_DISP_STRIPE 0x10000000000000ULL/*create stripe disposition*/
 
 /* XXX README XXX:
  * Please DO NOT add flag values here before first ensuring that this same
@@ -1344,7 +1345,9 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
                                OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_UMASK | \
                                OBD_CONNECT_LVB_TYPE | OBD_CONNECT_LAYOUTLOCK |\
                                OBD_CONNECT_PINGLESS | OBD_CONNECT_MAX_EASIZE |\
-                               OBD_CONNECT_FLOCK_DEAD)
+                               OBD_CONNECT_FLOCK_DEAD | \
+                               OBD_CONNECT_DISP_STRIPE)
+
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
                                OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \
@@ -2114,6 +2117,7 @@ extern void lustre_swab_generic_32s (__u32 *val);
 #define DISP_ENQ_CREATE_REF  0x01000000
 #define DISP_OPEN_LOCK       0x02000000
 #define DISP_OPEN_LEASE      0x04000000
+#define DISP_OPEN_STRIPE     0x08000000
 
 /* INODE LOCK PARTS */
 #define MDS_INODELOCK_LOOKUP 0x000001  /* For namespace, dentry etc, and also
index 82a230be732d10889f483700c3094c0ff3438a8a..6f7f48c7f2f10c290947958dd0314b2b2c117672 100644 (file)
@@ -388,6 +388,15 @@ static inline __u64 exp_connect_ibits(struct obd_export *exp)
        return ocd->ocd_ibits_known;
 }
 
+static inline bool imp_connect_disp_stripe(struct obd_import *imp)
+{
+       struct obd_connect_data *ocd;
+
+       LASSERT(imp != NULL);
+       ocd = &imp->imp_connect_data;
+       return ocd->ocd_connect_flags & OBD_CONNECT_DISP_STRIPE;
+}
+
 extern struct obd_export *class_conn2export(struct lustre_handle *conn);
 extern struct obd_device *class_conn2obd(struct lustre_handle *conn);
 
index 67259eb43cde1aa12208eab996481d3cf9789b8d..e9833ae8b171378f3effd46534936c641001fb5a 100644 (file)
@@ -180,6 +180,17 @@ struct obd_import {
        struct list_head                imp_delayed_list;
        /** @} */
 
+       /**
+        * List of requests that are retained for committed open replay. Once
+        * open is committed, open replay request will be moved from the
+        * imp_replay_list into the imp_committed_list.
+        * The imp_replay_cursor is for accelerating searching during replay.
+        * @{
+        */
+       struct list_head                imp_committed_list;
+       struct list_head               *imp_replay_cursor;
+       /** @} */
+
        /** obd device for this import */
        struct obd_device       *imp_obd;
 
index d8d08803542877f504947be3ceaa1c3cbfd55178..11382abf92910224a4a8fd85189523c329a98b62 100644 (file)
@@ -2621,6 +2621,8 @@ int ptlrpc_register_rqbd(struct ptlrpc_request_buffer_desc *rqbd);
  * request queues, request management, etc.
  * @{
  */
+void ptlrpc_request_committed(struct ptlrpc_request *req, int force);
+
 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
                        struct ptlrpc_client *);
 void ptlrpc_cleanup_client(struct obd_import *imp);
index c3470ce62cff31aebf7d5b0310ba309b6507c750..1b386955adda7315be7beaebdf485fe151692a38 100644 (file)
@@ -1323,7 +1323,8 @@ struct md_open_data {
        struct obd_client_handle *mod_och;
        struct ptlrpc_request    *mod_open_req;
        struct ptlrpc_request    *mod_close_req;
-       atomic_t              mod_refcount;
+       atomic_t                  mod_refcount;
+       bool                      mod_is_create;
 };
 
 struct lookup_intent;
@@ -1392,7 +1393,7 @@ struct md_ops {
 
        int (*m_set_open_replay_data)(struct obd_export *,
                                      struct obd_client_handle *,
-                                     struct ptlrpc_request *);
+                                     struct lookup_intent *);
        int (*m_clear_open_replay_data)(struct obd_export *,
                                        struct obd_client_handle *);
        int (*m_set_lock_data)(struct obd_export *, __u64 *, void *, __u64 *);
index 1c2ba19bd987234a610225c1ffe1484ecd25d1db..0a188207014ceedb5de5174cfb419bcb806e0e69 100644 (file)
@@ -2001,11 +2001,11 @@ static inline int md_getxattr(struct obd_export *exp,
 
 static inline int md_set_open_replay_data(struct obd_export *exp,
                                          struct obd_client_handle *och,
-                                         struct ptlrpc_request *open_req)
+                                         struct lookup_intent *it)
 {
        EXP_CHECK_MD_OP(exp, set_open_replay_data);
        EXP_MD_COUNTER_INCREMENT(exp, set_open_replay_data);
-       return MDP(exp->exp_obd, set_open_replay_data)(exp, och, open_req);
+       return MDP(exp->exp_obd, set_open_replay_data)(exp, och, it);
 }
 
 static inline int md_clear_open_replay_data(struct obd_export *exp,
index 362f5ecb94a04947c8cabdaeb9d7b354206873e9..7ceec740ece51716f78bdd274f442e51a0ff4582 100644 (file)
@@ -480,7 +480,7 @@ static int ll_och_fill(struct obd_export *md_exp, struct lookup_intent *it,
        och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
        och->och_flags = it->it_flags;
 
-       return md_set_open_replay_data(md_exp, och, req);
+       return md_set_open_replay_data(md_exp, och, it);
 }
 
 int ll_local_open(struct file *file, struct lookup_intent *it,
index 85c01e155680acb875ff8fd49a88850be56c88c4..7427f69e33b067ec39d39a3ee35161f9427580e3 100644 (file)
@@ -208,7 +208,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                  OBD_CONNECT_LAYOUTLOCK |
                                  OBD_CONNECT_PINGLESS |
                                  OBD_CONNECT_MAX_EASIZE |
-                                 OBD_CONNECT_FLOCK_DEAD;
+                                 OBD_CONNECT_FLOCK_DEAD |
+                                 OBD_CONNECT_DISP_STRIPE;
 
        if (sbi->ll_flags & LL_SBI_SOM_PREVIEW)
                data->ocd_connect_flags |= OBD_CONNECT_SOM;
index 1bddd8f62fbfbcf6681599b3cca8be893df49bc2..40fbd44bdda1d6062886bcc8ad740d1fc0a6ea33 100644 (file)
@@ -2593,7 +2593,7 @@ int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
 
 int lmv_set_open_replay_data(struct obd_export *exp,
                             struct obd_client_handle *och,
-                            struct ptlrpc_request *open_req)
+                            struct lookup_intent *it)
 {
        struct obd_device       *obd = exp->exp_obd;
        struct lmv_obd    *lmv = &obd->u.lmv;
@@ -2603,7 +2603,7 @@ int lmv_set_open_replay_data(struct obd_export *exp,
        if (IS_ERR(tgt))
                return PTR_ERR(tgt);
 
-       return md_set_open_replay_data(tgt->ltd_exp, och, open_req);
+       return md_set_open_replay_data(tgt->ltd_exp, och, it);
 }
 
 int lmv_clear_open_replay_data(struct obd_export *exp,
index fc21777b53c622dce5abff995d1fb40d11d4967e..c78bf003c2c524e8e8104e9ed0c23c38f318c002 100644 (file)
@@ -122,7 +122,7 @@ int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md);
 
 int mdc_set_open_replay_data(struct obd_export *exp,
                             struct obd_client_handle *och,
-                            struct ptlrpc_request *open_req);
+                            struct lookup_intent *it);
 
 int mdc_clear_open_replay_data(struct obd_export *exp,
                               struct obd_client_handle *och);
index 61109430f333eb59671a9e7d7f8233a034312f10..20706e788de909e1a44c9af2e39c5fba1cff5179 100644 (file)
@@ -641,7 +641,7 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                         * happens immediately after swabbing below, new reply
                         * is swabbed by that handler correctly.
                         */
-                       mdc_set_open_replay_data(NULL, NULL, req);
+                       mdc_set_open_replay_data(NULL, NULL, it);
                }
 
                if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
index 1aea154e122be7fc47a5ca731d6db9e5b632652b..d79aa1641feffa4161f6edc533aa300d18a5fb93 100644 (file)
@@ -165,6 +165,7 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
                        req->rq_cb_data = *mod;
                        (*mod)->mod_open_req = req;
                        req->rq_commit_cb = mdc_commit_open;
+                       (*mod)->mod_is_create = true;
                        /**
                         * Take an extra reference on \var mod, it protects \var
                         * mod from being freed on eviction (commit callback is
index 17c8e1486daa7ec36e702356c674fb73bed98f73..d9ddb393491964163d33b338d96b45866fab1d89 100644 (file)
@@ -722,11 +722,12 @@ void mdc_commit_open(struct ptlrpc_request *req)
 
 int mdc_set_open_replay_data(struct obd_export *exp,
                             struct obd_client_handle *och,
-                            struct ptlrpc_request *open_req)
+                            struct lookup_intent *it)
 {
        struct md_open_data   *mod;
        struct mdt_rec_create *rec;
        struct mdt_body       *body;
+       struct ptlrpc_request *open_req = it->d.lustre.it_data;
        struct obd_import     *imp = open_req->rq_import;
 
        if (!open_req->rq_replay)
@@ -760,6 +761,8 @@ int mdc_set_open_replay_data(struct obd_export *exp,
                spin_lock(&open_req->rq_lock);
                och->och_mod = mod;
                mod->mod_och = och;
+               mod->mod_is_create = it_disposition(it, DISP_OPEN_CREATE) ||
+                                    it_disposition(it, DISP_OPEN_STRIPE);
                mod->mod_open_req = open_req;
                open_req->rq_cb_data = mod;
                open_req->rq_commit_cb = mdc_commit_open;
@@ -780,6 +783,23 @@ int mdc_set_open_replay_data(struct obd_export *exp,
        return 0;
 }
 
+static void mdc_free_open(struct md_open_data *mod)
+{
+       int committed = 0;
+
+       if (mod->mod_is_create == 0 &&
+           imp_connect_disp_stripe(mod->mod_open_req->rq_import))
+               committed = 1;
+
+       LASSERT(mod->mod_open_req->rq_replay == 0);
+
+       DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, "free open request\n");
+
+       ptlrpc_request_committed(mod->mod_open_req, committed);
+       if (mod->mod_close_req)
+               ptlrpc_request_committed(mod->mod_close_req, committed);
+}
+
 int mdc_clear_open_replay_data(struct obd_export *exp,
                               struct obd_client_handle *och)
 {
@@ -793,6 +813,8 @@ int mdc_clear_open_replay_data(struct obd_export *exp,
                return 0;
 
        LASSERT(mod != LP_POISON);
+       LASSERT(mod->mod_open_req != NULL);
+       mdc_free_open(mod);
 
        mod->mod_och = NULL;
        och->och_mod = NULL;
@@ -991,6 +1013,9 @@ int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
        if (mod) {
                if (rc != 0)
                        mod->mod_close_req = NULL;
+               LASSERT(mod->mod_open_req != NULL);
+               mdc_free_open(mod);
+
                /* Since now, mod is accessed through setattr req only,
                 * thus DW req does not keep a reference on mod anymore. */
                obd_mod_put(mod);
index d27f0411d355e913f9e873f136545c4d75414fc6..169c9ed56521abdbf32d0c8902bcee8ae98d7f9c 100644 (file)
@@ -1010,6 +1010,8 @@ struct obd_import *class_new_import(struct obd_device *obd)
        INIT_LIST_HEAD(&imp->imp_replay_list);
        INIT_LIST_HEAD(&imp->imp_sending_list);
        INIT_LIST_HEAD(&imp->imp_delayed_list);
+       INIT_LIST_HEAD(&imp->imp_committed_list);
+       imp->imp_replay_cursor = &imp->imp_committed_list;
        spin_lock_init(&imp->imp_lock);
        imp->imp_last_success_conn = 0;
        imp->imp_state = LUSTRE_IMP_NEW;
index 6e7d2e5610674964de9cc6d0b7a8a9128c2e04ed..1432dd74fe95099504426d035b7d9b421e3a3fc6 100644 (file)
@@ -99,6 +99,7 @@ static const char * const obd_connect_names[] = {
        "short_io",
        "pingless",
        "flock_deadlock",
+       "disp_stripe",
        "unknown",
        NULL
 };
index eb33bb7c86ae8bd61c1196dbcd4e30c5c43abb62..a32b72235a00f4f4e7403b2abfb49994652cae82 100644 (file)
@@ -2360,6 +2360,39 @@ int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
 }
 EXPORT_SYMBOL(ptlrpc_unregister_reply);
 
+static void ptlrpc_free_request(struct ptlrpc_request *req)
+{
+       spin_lock(&req->rq_lock);
+       req->rq_replay = 0;
+       spin_unlock(&req->rq_lock);
+
+       if (req->rq_commit_cb != NULL)
+               req->rq_commit_cb(req);
+       list_del_init(&req->rq_replay_list);
+
+       __ptlrpc_req_finished(req, 1);
+}
+
+/**
+ * the request is committed and dropped from the replay list of its import
+ */
+void ptlrpc_request_committed(struct ptlrpc_request *req, int force)
+{
+       struct obd_import       *imp = req->rq_import;
+
+       spin_lock(&imp->imp_lock);
+       if (list_empty(&req->rq_replay_list)) {
+               spin_unlock(&imp->imp_lock);
+               return;
+       }
+
+       if (force || req->rq_transno <= imp->imp_peer_committed_transno)
+               ptlrpc_free_request(req);
+
+       spin_unlock(&imp->imp_lock);
+}
+EXPORT_SYMBOL(ptlrpc_request_committed);
+
 /**
  * Iterates through replay_list on import and prunes
  * all requests have transno smaller than last_committed for the
@@ -2370,9 +2403,9 @@ EXPORT_SYMBOL(ptlrpc_unregister_reply);
  */
 void ptlrpc_free_committed(struct obd_import *imp)
 {
-       struct list_head *tmp, *saved;
-       struct ptlrpc_request *req;
+       struct ptlrpc_request *req, *saved;
        struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
+       bool                   skip_committed_list = true;
 
        LASSERT(imp != NULL);
 
@@ -2388,13 +2421,15 @@ void ptlrpc_free_committed(struct obd_import *imp)
        CDEBUG(D_RPCTRACE, "%s: committing for last_committed "LPU64" gen %d\n",
               imp->imp_obd->obd_name, imp->imp_peer_committed_transno,
               imp->imp_generation);
+
+       if (imp->imp_generation != imp->imp_last_generation_checked)
+               skip_committed_list = false;
+
        imp->imp_last_transno_checked = imp->imp_peer_committed_transno;
        imp->imp_last_generation_checked = imp->imp_generation;
 
-       list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
-               req = list_entry(tmp, struct ptlrpc_request,
-                                    rq_replay_list);
-
+       list_for_each_entry_safe(req, saved, &imp->imp_replay_list,
+                                rq_replay_list) {
                /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
                LASSERT(req != last_req);
                last_req = req;
@@ -2408,27 +2443,34 @@ void ptlrpc_free_committed(struct obd_import *imp)
                        GOTO(free_req, 0);
                }
 
-               if (req->rq_replay) {
-                       DEBUG_REQ(D_RPCTRACE, req, "keeping (FL_REPLAY)");
-                       continue;
-               }
-
                /* not yet committed */
                if (req->rq_transno > imp->imp_peer_committed_transno) {
                        DEBUG_REQ(D_RPCTRACE, req, "stopping search");
                        break;
                }
 
+               if (req->rq_replay) {
+                       DEBUG_REQ(D_RPCTRACE, req, "keeping (FL_REPLAY)");
+                       list_move_tail(&req->rq_replay_list,
+                                      &imp->imp_committed_list);
+                       continue;
+               }
+
                DEBUG_REQ(D_INFO, req, "commit (last_committed "LPU64")",
                          imp->imp_peer_committed_transno);
 free_req:
-               spin_lock(&req->rq_lock);
-               req->rq_replay = 0;
-               spin_unlock(&req->rq_lock);
-               if (req->rq_commit_cb != NULL)
-                       req->rq_commit_cb(req);
-               list_del_init(&req->rq_replay_list);
-               __ptlrpc_req_finished(req, 1);
+               ptlrpc_free_request(req);
+       }
+       if (skip_committed_list)
+               return;
+
+       list_for_each_entry_safe(req, saved, &imp->imp_committed_list,
+                                rq_replay_list) {
+               LASSERT(req->rq_transno != 0);
+               if (req->rq_import_generation < imp->imp_generation) {
+                       DEBUG_REQ(D_RPCTRACE, req, "free stale open request");
+                       ptlrpc_free_request(req);
+               }
        }
 }
 
index 82db0ed606529f57d1540d02baf6934f78ca5b42..537aa6204a516d0e3f199011bfd257339c4dee0f 100644 (file)
@@ -560,17 +560,30 @@ static int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
        struct ptlrpc_request *req;
        struct list_head *tmp;
 
-       if (list_empty(&imp->imp_replay_list))
-               return 0;
-       tmp = imp->imp_replay_list.next;
-       req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
-       *transno = req->rq_transno;
-       if (req->rq_transno == 0) {
-               DEBUG_REQ(D_ERROR, req, "zero transno in replay");
-               LBUG();
+       /* The requests in committed_list always have smaller transnos than
+        * the requests in replay_list */
+       if (!list_empty(&imp->imp_committed_list)) {
+               tmp = imp->imp_committed_list.next;
+               req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
+               *transno = req->rq_transno;
+               if (req->rq_transno == 0) {
+                       DEBUG_REQ(D_ERROR, req,
+                                 "zero transno in committed_list");
+                       LBUG();
+               }
+               return 1;
        }
-
-       return 1;
+       if (!list_empty(&imp->imp_replay_list)) {
+               tmp = imp->imp_replay_list.next;
+               req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
+               *transno = req->rq_transno;
+               if (req->rq_transno == 0) {
+                       DEBUG_REQ(D_ERROR, req, "zero transno in replay_list");
+                       LBUG();
+               }
+               return 1;
+       }
+       return 0;
 }
 
 /**
index 84c39e083ea4bef97acfdabb2d2424cd76540b58..48ae328ce24ec138063647ab68b5e824c4a6b8a4 100644 (file)
@@ -105,24 +105,59 @@ int ptlrpc_replay_next(struct obd_import *imp, int *inflight)
         * imp_lock is being held by ptlrpc_replay, but it's not. it's
         * just a little race...
         */
-       list_for_each_safe(tmp, pos, &imp->imp_replay_list) {
+
+       /* Replay all the committed open requests on committed_list first */
+       if (!list_empty(&imp->imp_committed_list)) {
+               tmp = imp->imp_committed_list.prev;
                req = list_entry(tmp, struct ptlrpc_request,
                                     rq_replay_list);
 
-               /* If need to resend the last sent transno (because a
-                  reconnect has occurred), then stop on the matching
-                  req and send it again. If, however, the last sent
-                  transno has been committed then we continue replay
-                  from the next request. */
+               /* The last request on committed_list hasn't been replayed */
                if (req->rq_transno > last_transno) {
-                       if (imp->imp_resend_replay)
-                               lustre_msg_add_flags(req->rq_reqmsg,
-                                                    MSG_RESENT);
-                       break;
+                       /* Since the imp_committed_list is immutable before
+                        * all of it's requests being replayed, it's safe to
+                        * use a cursor to accelerate the search */
+                       imp->imp_replay_cursor = imp->imp_replay_cursor->next;
+
+                       while (imp->imp_replay_cursor !=
+                              &imp->imp_committed_list) {
+                               req = list_entry(imp->imp_replay_cursor,
+                                                struct ptlrpc_request,
+                                                rq_replay_list);
+                               if (req->rq_transno > last_transno)
+                                       break;
+
+                               req = NULL;
+                               imp->imp_replay_cursor =
+                                       imp->imp_replay_cursor->next;
+                       }
+               } else {
+                       /* All requests on committed_list have been replayed */
+                       imp->imp_replay_cursor = &imp->imp_committed_list;
+                       req = NULL;
+               }
+       }
+
+       /* All the requests in committed list have been replayed, let's replay
+        * the imp_replay_list */
+       if (req == NULL) {
+               list_for_each_safe(tmp, pos, &imp->imp_replay_list) {
+                       req = list_entry(tmp, struct ptlrpc_request,
+                                        rq_replay_list);
+
+                       if (req->rq_transno > last_transno)
+                               break;
+                       req = NULL;
                }
-               req = NULL;
        }
 
+       /* If need to resend the last sent transno (because a reconnect
+        * has occurred), then stop on the matching req and send it again.
+        * If, however, the last sent transno has been committed then we
+        * continue replay from the next request. */
+       if (req != NULL && imp->imp_resend_replay)
+               lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
+
        spin_lock(&imp->imp_lock);
        imp->imp_resend_replay = 0;
        spin_unlock(&imp->imp_lock);