struct rds_message *rm, *tmp;
unsigned long flags;
+ spin_lock_irqsave(&conn->c_send_lock, flags);
if (conn->c_xmit_rm) {
+ rm = conn->c_xmit_rm;
+ conn->c_xmit_rm = NULL;
/* Tell the user the RDMA op is no longer mapped by the
* transport. This isn't entirely true (it's flushed out
* independently) but as the connection is down, there's
* no ongoing RDMA to/from that memory */
- rds_message_unmapped(conn->c_xmit_rm);
- rds_message_put(conn->c_xmit_rm);
- conn->c_xmit_rm = NULL;
+ rds_message_unmapped(rm);
+ spin_unlock_irqrestore(&conn->c_send_lock, flags);
+
+ rds_message_put(rm);
+ } else {
+ spin_unlock_irqrestore(&conn->c_send_lock, flags);
}
+
conn->c_xmit_sg = 0;
conn->c_xmit_hdr_off = 0;
conn->c_xmit_data_off = 0;
- conn->c_xmit_rdma_sent = 0;
conn->c_xmit_atomic_sent = 0;
+ conn->c_xmit_rdma_sent = 0;
+ conn->c_xmit_data_sent = 0;
conn->c_map_queued = 0;
struct rds_message *rm;
unsigned long flags;
unsigned int tmp;
- unsigned int send_quota = send_batch_count;
struct scatterlist *sg;
int ret = 0;
- int was_empty = 0;
+ int gen = 0;
LIST_HEAD(to_be_dropped);
+restart:
+ if (!rds_conn_up(conn))
+ goto out;
+
/*
* sendmsg calls here after having queued its message on the send
* queue. We only have one task feeding the connection at a time. If
* another thread is already feeding the queue then we back off. This
* avoids blocking the caller and trading per-connection data between
* caches per message.
- *
- * The sem holder will issue a retry if they notice that someone queued
- * a message after they stopped walking the send queue but before they
- * dropped the sem.
*/
- if (!mutex_trylock(&conn->c_send_lock)) {
- rds_stats_inc(s_send_sem_contention);
+ if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) {
+ rds_stats_inc(s_send_lock_contention);
ret = -ENOMEM;
goto out;
}
+ atomic_inc(&conn->c_senders);
if (conn->c_trans->xmit_prepare)
conn->c_trans->xmit_prepare(conn);
+ gen = atomic_inc_return(&conn->c_send_generation);
+
/*
* spin trying to push headers and data down the connection until
- * the connection doens't make forward progress.
+ * the connection doesn't make forward progress.
*/
- while (--send_quota) {
- /*
- * See if need to send a congestion map update if we're
- * between sending messages. The send_sem protects our sole
- * use of c_map_offset and _bytes.
- * Note this is used only by transports that define a special
- * xmit_cong_map function. For all others, we create allocate
- * a cong_map message and treat it just like any other send.
- */
- if (conn->c_map_bytes) {
- ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong,
- conn->c_map_offset);
- if (ret <= 0)
- break;
-
- conn->c_map_offset += ret;
- conn->c_map_bytes -= ret;
- if (conn->c_map_bytes)
- continue;
- }
+ while (1) {
- /* If we're done sending the current message, clear the
- * offset and S/G temporaries.
- */
rm = conn->c_xmit_rm;
- if (rm &&
- conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
- conn->c_xmit_sg == rm->data.op_nents) {
- conn->c_xmit_rm = NULL;
- conn->c_xmit_sg = 0;
- conn->c_xmit_hdr_off = 0;
- conn->c_xmit_data_off = 0;
- conn->c_xmit_rdma_sent = 0;
- conn->c_xmit_atomic_sent = 0;
-
- /* Release the reference to the previous message. */
- rds_message_put(rm);
- rm = NULL;
- }
- /* If we're asked to send a cong map update, do so.
+ /*
+ * If between sending messages, we can send a pending congestion
+ * map update.
*/
if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
- if (conn->c_trans->xmit_cong_map) {
- conn->c_map_offset = 0;
- conn->c_map_bytes = sizeof(struct rds_header) +
- RDS_CONG_MAP_BYTES;
- continue;
- }
-
rm = rds_cong_update_alloc(conn);
if (IS_ERR(rm)) {
ret = PTR_ERR(rm);
break;
}
+ rm->data.op_active = 1;
conn->c_xmit_rm = rm;
}
/*
- * Grab the next message from the send queue, if there is one.
+ * If not already working on one, grab the next message.
*
* c_xmit_rm holds a ref while we're sending this message down
* the connction. We can use this ref while holding the
if (!rm) {
unsigned int len;
- spin_lock_irqsave(&conn->c_lock, flags);
+ spin_lock(&conn->c_lock);
if (!list_empty(&conn->c_send_queue)) {
rm = list_entry(conn->c_send_queue.next,
list_move_tail(&rm->m_conn_item, &conn->c_retrans);
}
- spin_unlock_irqrestore(&conn->c_lock, flags);
+ spin_unlock(&conn->c_lock);
- if (!rm) {
- was_empty = 1;
+ if (!rm)
break;
- }
/* Unfortunately, the way Infiniband deals with
* RDMA to a bad MR key is by moving the entire
*/
if (rm->rdma.op_active &&
test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
- spin_lock_irqsave(&conn->c_lock, flags);
+ spin_lock(&conn->c_lock);
if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
list_move(&rm->m_conn_item, &to_be_dropped);
- spin_unlock_irqrestore(&conn->c_lock, flags);
- rds_message_put(rm);
+ spin_unlock(&conn->c_lock);
continue;
}
conn->c_xmit_rm = rm;
}
-
- if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
- ret = conn->c_trans->xmit_atomic(conn, rm);
+ /* The transport either sends the whole rdma or none of it */
+ if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
+ rm->m_final_op = &rm->rdma;
+ ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
if (ret)
break;
- conn->c_xmit_atomic_sent = 1;
+ conn->c_xmit_rdma_sent = 1;
+
/* The transport owns the mapped memory for now.
* You can't unmap it while it's on the send queue */
set_bit(RDS_MSG_MAPPED, &rm->m_flags);
}
- /*
- * Try and send an rdma message. Let's see if we can
- * keep this simple and require that the transport either
- * send the whole rdma or none of it.
- */
- if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
- ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
+ if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
+ rm->m_final_op = &rm->atomic;
+ ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
if (ret)
break;
- conn->c_xmit_rdma_sent = 1;
-
- /* rdmas need data sent, even if just the header */
- rm->data.op_active = 1;
+ conn->c_xmit_atomic_sent = 1;
/* The transport owns the mapped memory for now.
* You can't unmap it while it's on the send queue */
set_bit(RDS_MSG_MAPPED, &rm->m_flags);
}
- if (rm->data.op_active
- && (conn->c_xmit_hdr_off < sizeof(struct rds_header) ||
- conn->c_xmit_sg < rm->data.op_nents)) {
+ /*
+ * A number of cases require an RDS header to be sent
+ * even if there is no data.
+ * We permit 0-byte sends; rds-ping depends on this.
+ * However, if there are exclusively attached silent ops,
+ * we skip the hdr/data send, to enable silent operation.
+ */
+ if (rm->data.op_nents == 0) {
+ int ops_present;
+ int all_ops_are_silent = 1;
+
+ ops_present = (rm->atomic.op_active || rm->rdma.op_active);
+ if (rm->atomic.op_active && !rm->atomic.op_silent)
+ all_ops_are_silent = 0;
+ if (rm->rdma.op_active && !rm->rdma.op_silent)
+ all_ops_are_silent = 0;
+
+ if (ops_present && all_ops_are_silent
+ && !rm->m_rdma_cookie)
+ rm->data.op_active = 0;
+ }
+
+ if (rm->data.op_active && !conn->c_xmit_data_sent) {
+ rm->m_final_op = &rm->data;
ret = conn->c_trans->xmit(conn, rm,
conn->c_xmit_hdr_off,
conn->c_xmit_sg,
conn->c_xmit_sg == rm->data.op_nents);
}
}
+
+ if (conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
+ (conn->c_xmit_sg == rm->data.op_nents))
+ conn->c_xmit_data_sent = 1;
}
- }
- /* Nuke any messages we decided not to retransmit. */
- if (!list_empty(&to_be_dropped))
- rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
+ /*
+ * A rm will only take multiple times through this loop
+ * if there is a data op. Thus, if the data is sent (or there was
+ * none), then we're done with the rm.
+ */
+ if (!rm->data.op_active || conn->c_xmit_data_sent) {
+ conn->c_xmit_rm = NULL;
+ conn->c_xmit_sg = 0;
+ conn->c_xmit_hdr_off = 0;
+ conn->c_xmit_data_off = 0;
+ conn->c_xmit_rdma_sent = 0;
+ conn->c_xmit_atomic_sent = 0;
+ conn->c_xmit_data_sent = 0;
+
+ rds_message_put(rm);
+ }
+ }
if (conn->c_trans->xmit_complete)
conn->c_trans->xmit_complete(conn);
* stop processing the loop when the transport hasn't taken
* responsibility for forward progress.
*/
- mutex_unlock(&conn->c_send_lock);
+ spin_unlock_irqrestore(&conn->c_send_lock, flags);
- if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) {
- /* We exhausted the send quota, but there's work left to
- * do. Return and (re-)schedule the send worker.
- */
- ret = -EAGAIN;
+ /* Nuke any messages we decided not to retransmit. */
+ if (!list_empty(&to_be_dropped)) {
+ /* irqs on here, so we can put(), unlike above */
+ list_for_each_entry(rm, &to_be_dropped, m_conn_item)
+ rds_message_put(rm);
+ rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
}
- if (ret == 0 && was_empty) {
- /* A simple bit test would be way faster than taking the
- * spin lock */
- spin_lock_irqsave(&conn->c_lock, flags);
+ atomic_dec(&conn->c_senders);
+
+ /*
+ * Other senders will see we have c_send_lock and exit. We
+ * need to recheck the send queue and race again for c_send_lock
+ * to make sure messages don't just sit on the send queue, if
+ * somebody hasn't already beat us into the loop.
+ *
+ * If the transport cannot continue (i.e ret != 0), then it must
+ * call us when more room is available, such as from the tx
+ * completion handler.
+ */
+ if (ret == 0) {
+ smp_mb();
if (!list_empty(&conn->c_send_queue)) {
- rds_stats_inc(s_send_sem_queue_raced);
- ret = -EAGAIN;
+ rds_stats_inc(s_send_lock_queue_raced);
+ if (gen == atomic_read(&conn->c_send_generation)) {
+ goto restart;
+ }
}
- spin_unlock_irqrestore(&conn->c_lock, flags);
}
out:
return ret;
struct rds_sock *rs = NULL;
struct rm_atomic_op *ao;
struct rds_notifier *notifier;
+ unsigned long flags;
- spin_lock(&rm->m_rs_lock);
+ spin_lock_irqsave(&rm->m_rs_lock, flags);
ao = &rm->atomic;
if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
ao->op_notifier = NULL;
}
- spin_unlock(&rm->m_rs_lock);
+ spin_unlock_irqrestore(&rm->m_rs_lock, flags);
if (rs) {
rds_wake_sk_sleep(rs);
* socket, socket lock) and can just move the notifier.
*/
static inline void
-__rds_rdma_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
+__rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
{
struct rm_rdma_op *ro;
+ struct rm_atomic_op *ao;
ro = &rm->rdma;
if (ro->op_active && ro->op_notify && ro->op_notifier) {
ro->op_notifier = NULL;
}
+ ao = &rm->atomic;
+ if (ao->op_active && ao->op_notify && ao->op_notifier) {
+ ao->op_notifier->n_status = status;
+ list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue);
+ ao->op_notifier = NULL;
+ }
+
/* No need to wake the app - caller does this */
}
spin_lock_irqsave(&rm->m_rs_lock, flags);
spin_lock(&rs->rs_lock);
- __rds_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED);
+ __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
spin_unlock(&rs->rs_lock);
rm->m_rs = NULL;
{
struct cmsghdr *cmsg;
int size = 0;
+ int cmsg_groups = 0;
int retval;
for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
switch (cmsg->cmsg_type) {
case RDS_CMSG_RDMA_ARGS:
+ cmsg_groups |= 1;
retval = rds_rdma_extra_size(CMSG_DATA(cmsg));
if (retval < 0)
return retval;
size += retval;
+
break;
case RDS_CMSG_RDMA_DEST:
case RDS_CMSG_RDMA_MAP:
+ cmsg_groups |= 2;
/* these are valid but do no add any size */
break;
case RDS_CMSG_ATOMIC_CSWP:
case RDS_CMSG_ATOMIC_FADD:
+ cmsg_groups |= 1;
size += sizeof(struct scatterlist);
break;
size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist);
+ /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
+ if (cmsg_groups == 3)
+ return -EINVAL;
+
return size;
}
goto out;
}
- rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE));
- /* XXX fix this to not allocate memory */
- ret = rds_message_copy_from_user(rm, msg->msg_iov, payload_len);
- if (ret)
- goto out;
+ /* Attach data to the rm */
+ if (payload_len) {
+ rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE));
+ ret = rds_message_copy_from_user(rm, msg->msg_iov, payload_len);
+ if (ret)
+ goto out;
+ }
+ rm->data.op_active = 1;
rm->m_daddr = daddr;
if (ret)
goto out;
- if ((rm->m_rdma_cookie || rm->rdma.op_active) &&
- !conn->c_trans->xmit_rdma) {
+ if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
if (printk_ratelimit())
printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
&rm->rdma, conn->c_trans->xmit_rdma);
rds_stats_inc(s_send_queued);
if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags))
- rds_send_worker(&conn->c_send_w.work);
+ rds_send_xmit(conn);
rds_message_put(rm);
return payload_len;
}
rm->m_daddr = conn->c_faddr;
+ rm->data.op_active = 1;
/* If the connection is down, trigger a connect. We may
* have scheduled a delayed reconnect however - in this case
rds_stats_inc(s_send_queued);
rds_stats_inc(s_send_pong);
- queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+ if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags))
+ rds_send_xmit(conn);
+
rds_message_put(rm);
return 0;