struct rds_message *rm, *tmp;
unsigned long flags;
+ spin_lock_irqsave(&conn->c_send_lock, flags);
if (conn->c_xmit_rm) {
+ rm = conn->c_xmit_rm;
+ conn->c_xmit_rm = NULL;
/* Tell the user the RDMA op is no longer mapped by the
* transport. This isn't entirely true (it's flushed out
* independently) but as the connection is down, there's
* no ongoing RDMA to/from that memory */
- rds_message_unmapped(conn->c_xmit_rm);
- rds_message_put(conn->c_xmit_rm);
- conn->c_xmit_rm = NULL;
+ rds_message_unmapped(rm);
+ spin_unlock_irqrestore(&conn->c_send_lock, flags);
+
+ rds_message_put(rm);
+ } else {
+ spin_unlock_irqrestore(&conn->c_send_lock, flags);
}
+
conn->c_xmit_sg = 0;
conn->c_xmit_hdr_off = 0;
conn->c_xmit_data_off = 0;
struct rds_message *rm;
unsigned long flags;
unsigned int tmp;
- unsigned int send_quota = send_batch_count;
struct scatterlist *sg;
int ret = 0;
- int was_empty = 0;
+ int gen = 0;
LIST_HEAD(to_be_dropped);
+restart:
if (!rds_conn_up(conn))
goto out;
ret = -ENOMEM;
goto out;
}
+ atomic_inc(&conn->c_senders);
if (conn->c_trans->xmit_prepare)
conn->c_trans->xmit_prepare(conn);
+ gen = atomic_inc_return(&conn->c_send_generation);
+
/*
* spin trying to push headers and data down the connection until
* the connection doesn't make forward progress.
*/
- while (--send_quota) {
+ while (1) {
rm = conn->c_xmit_rm;
spin_unlock(&conn->c_lock);
- if (!rm) {
- was_empty = 1;
+ if (!rm)
break;
- }
/* Unfortunately, the way Infiniband deals with
* RDMA to a bad MR key is by moving the entire
rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
}
- if (send_quota == 0 && !was_empty) {
- /* We exhausted the send quota, but there's work left to
- * do. Return and (re-)schedule the send worker.
- */
- ret = -EAGAIN;
- }
+ atomic_dec(&conn->c_senders);
- if (ret == 0 && was_empty) {
- /* A simple bit test would be way faster than taking the
- * spin lock */
- spin_lock_irqsave(&conn->c_lock, flags);
+ /*
+ * Other senders will see we have c_send_lock and exit. We
+ * need to recheck the send queue and race again for c_send_lock
+ * to make sure messages don't just sit on the send queue, if
+ * somebody hasn't already beat us into the loop.
+ *
+ * If the transport cannot continue (i.e ret != 0), then it must
+ * call us when more room is available, such as from the tx
+ * completion handler.
+ */
+ if (ret == 0) {
+ smp_mb();
if (!list_empty(&conn->c_send_queue)) {
rds_stats_inc(s_send_lock_queue_raced);
- ret = -EAGAIN;
+ if (gen == atomic_read(&conn->c_send_generation)) {
+ goto restart;
+ }
}
- spin_unlock_irqrestore(&conn->c_lock, flags);
}
out:
return ret;
struct rds_sock *rs = NULL;
struct rm_atomic_op *ao;
struct rds_notifier *notifier;
+ unsigned long flags;
- spin_lock(&rm->m_rs_lock);
+ spin_lock_irqsave(&rm->m_rs_lock, flags);
ao = &rm->atomic;
if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
ao->op_notifier = NULL;
}
- spin_unlock(&rm->m_rs_lock);
+ spin_unlock_irqrestore(&rm->m_rs_lock, flags);
if (rs) {
rds_wake_sk_sleep(rs);
}
rm->m_daddr = conn->c_faddr;
+ rm->data.op_active = 1;
/* If the connection is down, trigger a connect. We may
* have scheduled a delayed reconnect however - in this case
rds_stats_inc(s_send_queued);
rds_stats_inc(s_send_pong);
- queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+ if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags))
+ rds_send_xmit(conn);
+
rds_message_put(rm);
return 0;