void tipc_bclink_wakeup_users(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
- struct sk_buff *skb;
- while ((skb = skb_dequeue(&tn->bclink->link.waiting_sks)))
- tipc_sk_rcv(net, skb);
+ tipc_sk_rcv(net, &tn->bclink->link.wakeupq);
}
/**
tipc_link_push_packets(tn->bcl);
bclink_set_last_sent(net);
}
- if (unlikely(released && !skb_queue_empty(&tn->bcl->waiting_sks)))
+ if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;
-
exit:
tipc_bclink_unlock(net);
}
u32 next_in;
u32 seqno;
int deferred = 0;
+ int pos = 0;
+ struct sk_buff *iskb;
+ struct sk_buff_head msgs;
/* Screen out unwanted broadcast messages */
if (msg_mc_netid(msg) != tn->net_id)
bcl->stats.recv_bundled += msg_msgcnt(msg);
tipc_bclink_unlock(net);
tipc_node_unlock(node);
- tipc_link_bundle_rcv(net, buf);
+ while (tipc_msg_extract(buf, &iskb, &pos))
+ tipc_sk_mcast_rcv(net, iskb);
} else if (msg_user(msg) == MSG_FRAGMENTER) {
tipc_buf_append(&node->bclink.reasm_buf, &buf);
if (unlikely(!buf && !node->bclink.reasm_buf))
bclink_accept_pkt(node, seqno);
tipc_bclink_unlock(net);
tipc_node_unlock(node);
- tipc_named_rcv(net, buf);
+ skb_queue_head_init(&msgs);
+ skb_queue_tail(&msgs, buf);
+ tipc_named_rcv(net, &msgs);
} else {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
spin_lock_init(&bclink->lock);
__skb_queue_head_init(&bcl->outqueue);
__skb_queue_head_init(&bcl->deferred_queue);
- skb_queue_head_init(&bcl->waiting_sks);
+ skb_queue_head_init(&bcl->wakeupq);
bcl->next_out_no = 1;
spin_lock_init(&bclink->node.lock);
- __skb_queue_head_init(&bclink->node.waiting_sks);
bcl->owner = &bclink->node;
bcl->owner->net = net;
bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
static void link_print(struct tipc_link *l_ptr, const char *str);
static void tipc_link_sync_xmit(struct tipc_link *l);
static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
-static int tipc_link_input(struct net *net, struct tipc_link *l,
- struct sk_buff *buf);
-static int tipc_link_prepare_input(struct net *net, struct tipc_link *l,
- struct sk_buff **buf);
+static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
+static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
/*
* Simple link routines
l_ptr->next_out_no = 1;
__skb_queue_head_init(&l_ptr->outqueue);
__skb_queue_head_init(&l_ptr->deferred_queue);
- skb_queue_head_init(&l_ptr->waiting_sks);
-
+ skb_queue_head_init(&l_ptr->wakeupq);
+ skb_queue_head_init(&l_ptr->inputq);
+ skb_queue_head_init(&l_ptr->namedq);
link_reset_statistics(l_ptr);
tipc_node_attach_link(n_ptr, l_ptr);
setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr);
return false;
TIPC_SKB_CB(buf)->chain_sz = chain_sz;
TIPC_SKB_CB(buf)->chain_imp = imp;
- skb_queue_tail(&link->waiting_sks, buf);
+ skb_queue_tail(&link->wakeupq, buf);
link->stats.link_congs++;
return true;
}
* Move a number of waiting users, as permitted by available space in
* the send queue, from link wait queue to node wait queue for wakeup
*/
-static void link_prepare_wakeup(struct tipc_link *link)
+void link_prepare_wakeup(struct tipc_link *link)
{
uint pend_qsz = skb_queue_len(&link->outqueue);
struct sk_buff *skb, *tmp;
- skb_queue_walk_safe(&link->waiting_sks, skb, tmp) {
+ skb_queue_walk_safe(&link->wakeupq, skb, tmp) {
if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp])
break;
pend_qsz += TIPC_SKB_CB(skb)->chain_sz;
- skb_unlink(skb, &link->waiting_sks);
- skb_queue_tail(&link->owner->waiting_sks, skb);
+ skb_unlink(skb, &link->wakeupq);
+ skb_queue_tail(&link->inputq, skb);
+ link->owner->inputq = &link->inputq;
+ link->owner->action_flags |= TIPC_MSG_EVT;
}
}
l_ptr->exp_msg_count = START_CHANGEOVER;
}
- /* Clean up all queues: */
+ /* Clean up all queues, except inputq: */
__skb_queue_purge(&l_ptr->outqueue);
__skb_queue_purge(&l_ptr->deferred_queue);
- if (!skb_queue_empty(&l_ptr->waiting_sks)) {
- skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
- owner->action_flags |= TIPC_WAKEUP_USERS;
- }
+ skb_queue_splice_init(&l_ptr->wakeupq, &l_ptr->inputq);
+ if (!skb_queue_empty(&l_ptr->inputq))
+ owner->action_flags |= TIPC_MSG_EVT;
+ owner->inputq = &l_ptr->inputq;
l_ptr->next_out = NULL;
l_ptr->unacked_window = 0;
l_ptr->checkpoint = 1;
static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
{
- __skb_queue_head_init(list);
+ skb_queue_head_init(list);
__skb_queue_tail(list, skb);
}
rc = __tipc_link_xmit(net, link, list);
tipc_node_unlock(node);
}
-
if (link)
return rc;
- if (likely(in_own_node(net, dnode))) {
- /* As a node local message chain never contains more than one
- * buffer, we just need to dequeue one SKB buffer from the
- * head list.
- */
- return tipc_sk_rcv(net, __skb_dequeue(list));
- }
- __skb_queue_purge(list);
+ if (likely(in_own_node(net, dnode)))
+ return tipc_sk_rcv(net, list);
+ __skb_queue_purge(list);
return rc;
}
/* Locate unicast link endpoint that should handle message */
l_ptr = n_ptr->links[b_ptr->identity];
if (unlikely(!l_ptr))
- goto unlock_discard;
+ goto unlock;
/* Verify that communication with node is currently allowed */
if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) &&
n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;
if (tipc_node_blocked(n_ptr))
- goto unlock_discard;
+ goto unlock;
/* Validate message sequence number info */
seq_no = msg_seqno(msg);
if (unlikely(l_ptr->next_out))
tipc_link_push_packets(l_ptr);
- if (released && !skb_queue_empty(&l_ptr->waiting_sks)) {
+ if (released && !skb_queue_empty(&l_ptr->wakeupq))
link_prepare_wakeup(l_ptr);
- l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS;
- }
/* Process the incoming packet */
if (unlikely(!link_working_working(l_ptr))) {
if (msg_user(msg) == LINK_PROTOCOL) {
tipc_link_proto_rcv(l_ptr, skb);
link_retrieve_defq(l_ptr, &head);
- tipc_node_unlock(n_ptr);
- continue;
+ skb = NULL;
+ goto unlock;
}
/* Traffic message. Conditionally activate link */
if (link_working_working(l_ptr)) {
/* Re-insert buffer in front of queue */
__skb_queue_head(&head, skb);
- tipc_node_unlock(n_ptr);
- continue;
+ skb = NULL;
+ goto unlock;
}
- goto unlock_discard;
+ goto unlock;
}
/* Link is now in state WORKING_WORKING */
if (unlikely(seq_no != mod(l_ptr->next_in_no))) {
link_handle_out_of_seq_msg(l_ptr, skb);
link_retrieve_defq(l_ptr, &head);
- tipc_node_unlock(n_ptr);
- continue;
+ skb = NULL;
+ goto unlock;
}
l_ptr->next_in_no++;
if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
l_ptr->stats.sent_acks++;
tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
}
-
- if (tipc_link_prepare_input(net, l_ptr, &skb)) {
- tipc_node_unlock(n_ptr);
- continue;
- }
- tipc_node_unlock(n_ptr);
-
- if (tipc_link_input(net, l_ptr, skb) != 0)
- goto discard;
- continue;
-unlock_discard:
+ tipc_link_input(l_ptr, skb);
+ skb = NULL;
+unlock:
tipc_node_unlock(n_ptr);
discard:
- kfree_skb(skb);
+ if (unlikely(skb))
+ kfree_skb(skb);
}
}
-/**
- * tipc_link_prepare_input - process TIPC link messages
- *
- * returns nonzero if the message was consumed
+/* tipc_data_input - deliver data and name distr msgs to upper layer
*
+ * Consumes buffer if message is of right type
* Node lock must be held
*/
-static int tipc_link_prepare_input(struct net *net, struct tipc_link *l,
- struct sk_buff **buf)
+static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
{
- struct tipc_node *n;
- struct tipc_msg *msg;
- int res = -EINVAL;
+ struct tipc_node *node = link->owner;
+ struct tipc_msg *msg = buf_msg(skb);
+ u32 dport = msg_destport(msg);
- n = l->owner;
- msg = buf_msg(*buf);
switch (msg_user(msg)) {
- case CHANGEOVER_PROTOCOL:
- if (tipc_link_tunnel_rcv(n, buf))
- res = 0;
- break;
- case MSG_FRAGMENTER:
- l->stats.recv_fragments++;
- if (tipc_buf_append(&l->reasm_buf, buf)) {
- l->stats.recv_fragmented++;
- res = 0;
- } else if (!l->reasm_buf) {
- tipc_link_reset(l);
+ case TIPC_LOW_IMPORTANCE:
+ case TIPC_MEDIUM_IMPORTANCE:
+ case TIPC_HIGH_IMPORTANCE:
+ case TIPC_CRITICAL_IMPORTANCE:
+ case CONN_MANAGER:
+ if (tipc_skb_queue_tail(&link->inputq, skb, dport)) {
+ node->inputq = &link->inputq;
+ node->action_flags |= TIPC_MSG_EVT;
}
- break;
- case MSG_BUNDLER:
- l->stats.recv_bundles++;
- l->stats.recv_bundled += msg_msgcnt(msg);
- res = 0;
- break;
+ return true;
case NAME_DISTRIBUTOR:
- n->bclink.recv_permitted = true;
- res = 0;
- break;
+ node->bclink.recv_permitted = true;
+ node->namedq = &link->namedq;
+ skb_queue_tail(&link->namedq, skb);
+ if (skb_queue_len(&link->namedq) == 1)
+ node->action_flags |= TIPC_NAMED_MSG_EVT;
+ return true;
+ case MSG_BUNDLER:
+ case CHANGEOVER_PROTOCOL:
+ case MSG_FRAGMENTER:
case BCAST_PROTOCOL:
- tipc_link_sync_rcv(n, *buf);
- break;
+ return false;
default:
- res = 0;
- }
- return res;
+ pr_warn("Dropping received illegal msg type\n");
+ kfree_skb(skb);
+ return false;
+ };
}
-/**
- * tipc_link_input - Deliver message too higher layers
+
+/* tipc_link_input - process packet that has passed link protocol check
+ *
+ * Consumes buffer
+ * Node lock must be held
*/
-static int tipc_link_input(struct net *net, struct tipc_link *l,
- struct sk_buff *buf)
+static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
{
- struct tipc_msg *msg = buf_msg(buf);
- int res = 0;
+ struct tipc_node *node = link->owner;
+ struct tipc_msg *msg = buf_msg(skb);
+ struct sk_buff *iskb;
+ int pos = 0;
+
+ if (likely(tipc_data_input(link, skb)))
+ return;
switch (msg_user(msg)) {
- case TIPC_LOW_IMPORTANCE:
- case TIPC_MEDIUM_IMPORTANCE:
- case TIPC_HIGH_IMPORTANCE:
- case TIPC_CRITICAL_IMPORTANCE:
- case CONN_MANAGER:
- tipc_sk_rcv(net, buf);
+ case CHANGEOVER_PROTOCOL:
+ if (!tipc_link_tunnel_rcv(node, &skb))
+ break;
+ if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
+ tipc_data_input(link, skb);
+ break;
+ }
+ case MSG_BUNDLER:
+ link->stats.recv_bundles++;
+ link->stats.recv_bundled += msg_msgcnt(msg);
+
+ while (tipc_msg_extract(skb, &iskb, &pos))
+ tipc_data_input(link, iskb);
break;
- case NAME_DISTRIBUTOR:
- tipc_named_rcv(net, buf);
+ case MSG_FRAGMENTER:
+ link->stats.recv_fragments++;
+ if (tipc_buf_append(&link->reasm_buf, &skb)) {
+ link->stats.recv_fragmented++;
+ tipc_data_input(link, skb);
+ } else if (!link->reasm_buf) {
+ tipc_link_reset(link);
+ }
break;
- case MSG_BUNDLER:
- tipc_link_bundle_rcv(net, buf);
+ case BCAST_PROTOCOL:
+ tipc_link_sync_rcv(node, skb);
break;
default:
- res = -EINVAL;
- }
- return res;
+ break;
+ };
}
/**
* @from_pos: offset to extract from
*
* Returns a new message buffer containing an embedded message. The
- * encapsulating message itself is left unchanged.
+ * encapsulating buffer is left unchanged.
*/
static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
{
return eb;
}
-
-
/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
* Owner node is locked.
*/
return *buf != NULL;
}
-/*
- * Bundler functionality:
- */
-void tipc_link_bundle_rcv(struct net *net, struct sk_buff *buf)
-{
- u32 msgcount = msg_msgcnt(buf_msg(buf));
- u32 pos = INT_H_SIZE;
- struct sk_buff *obuf;
- struct tipc_msg *omsg;
-
- while (msgcount--) {
- obuf = buf_extract(buf, pos);
- if (obuf == NULL) {
- pr_warn("Link unable to unbundle message(s)\n");
- break;
- }
- omsg = buf_msg(obuf);
- pos += align(msg_size(omsg));
- if (msg_isdata(omsg)) {
- if (unlikely(msg_type(omsg) == TIPC_MCAST_MSG))
- tipc_sk_mcast_rcv(net, obuf);
- else
- tipc_sk_rcv(net, obuf);
- } else if (msg_user(omsg) == CONN_MANAGER) {
- tipc_sk_rcv(net, obuf);
- } else if (msg_user(omsg) == NAME_DISTRIBUTOR) {
- tipc_named_rcv(net, obuf);
- } else {
- pr_warn("Illegal bundled msg: %u\n", msg_user(omsg));
- kfree_skb(obuf);
- }
- }
- kfree_skb(buf);
-}
-
static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
{
unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
* @next_in_no: next sequence number to expect for inbound messages
* @deferred_queue: deferred queue saved OOS b'cast message received from node
* @unacked_window: # of inbound messages rx'd without ack'ing back to peer
+ * @inputq: buffer queue for messages to be delivered upwards
+ * @namedq: buffer queue for name table messages to be delivered upwards
* @next_out: ptr to first unsent outbound message in queue
- * @waiting_sks: linked list of sockets waiting for link congestion to abate
+ * @wakeupq: linked list of wakeup msgs waiting for link congestion to abate
* @long_msg_seq_no: next identifier to use for outbound fragmented messages
* @reasm_buf: head of partially reassembled inbound message fragments
* @stats: collects statistics regarding link activity
u32 next_in_no;
struct sk_buff_head deferred_queue;
u32 unacked_window;
+ struct sk_buff_head inputq;
+ struct sk_buff_head namedq;
/* Congestion handling */
struct sk_buff *next_out;
- struct sk_buff_head waiting_sks;
+ struct sk_buff_head wakeupq;
/* Fragmentation/reassembly */
u32 long_msg_seq_no;
u32 selector);
int __tipc_link_xmit(struct net *net, struct tipc_link *link,
struct sk_buff_head *list);
-void tipc_link_bundle_rcv(struct net *net, struct sk_buff *buf);
void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
u32 gap, u32 tolerance, u32 priority, u32 acked_mtu);
void tipc_link_push_packets(struct tipc_link *l_ptr);
int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info);
int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info);
int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
+void link_prepare_wakeup(struct tipc_link *l);
/*
* Link sequence number manipulation routines (uses modulo 2**16 arithmetic)
return true;
}
+/**
+ * tipc_msg_extract(): extract bundled inner packet from buffer
+ * @skb: linear outer buffer, to be extracted from.
+ * @iskb: extracted inner buffer, to be returned
+ * @pos: position of msg to be extracted. Returns with pointer of next msg
+ * Consumes outer buffer when last packet extracted
+ * Returns true when when there is an extracted buffer, otherwise false
+ */
+bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos)
+{
+ struct tipc_msg *msg = buf_msg(skb);
+ int imsz;
+ struct tipc_msg *imsg = (struct tipc_msg *)(msg_data(msg) + *pos);
+
+ /* Is there space left for shortest possible message? */
+ if (*pos > (msg_data_sz(msg) - SHORT_H_SIZE))
+ goto none;
+ imsz = msg_size(imsg);
+
+ /* Is there space left for current message ? */
+ if ((*pos + imsz) > msg_data_sz(msg))
+ goto none;
+ *iskb = tipc_buf_acquire(imsz);
+ if (!*iskb)
+ goto none;
+ skb_copy_to_linear_data(*iskb, imsg, imsz);
+ *pos += align(imsz);
+ return true;
+none:
+ kfree_skb(skb);
+ *iskb = NULL;
+ return false;
+}
+
/**
* tipc_msg_make_bundle(): Create bundle buf and append message to its tail
* @list: the buffer chain
* Note: Some items are also used with TIPC internal message headers
*/
#define TIPC_VERSION 2
+struct plist;
/*
* Payload message users are defined in TIPC's public API:
bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu);
bool tipc_msg_make_bundle(struct sk_buff_head *list,
struct sk_buff *skb, u32 mtu, u32 dnode);
+bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
int offset, int dsz, int mtu, struct sk_buff_head *list);
bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode,
int *err);
struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);
+/* tipc_skb_peek_port(): find a destination port, ignoring all destinations
+ * up to and including 'filter'.
+ * Note: ignoring previously tried destinations minimizes the risk of
+ * contention on the socket lock
+ * @list: list to be peeked in
+ * @filter: last destination to be ignored from search
+ * Returns a destination port number, of applicable.
+ */
+static inline u32 tipc_skb_peek_port(struct sk_buff_head *list, u32 filter)
+{
+ struct sk_buff *skb;
+ u32 dport = 0;
+ bool ignore = true;
+
+ spin_lock_bh(&list->lock);
+ skb_queue_walk(list, skb) {
+ dport = msg_destport(buf_msg(skb));
+ if (!filter || skb_queue_is_last(list, skb))
+ break;
+ if (dport == filter)
+ ignore = false;
+ else if (!ignore)
+ break;
+ }
+ spin_unlock_bh(&list->lock);
+ return dport;
+}
+
+/* tipc_skb_dequeue(): unlink first buffer with dest 'dport' from list
+ * @list: list to be unlinked from
+ * @dport: selection criteria for buffer to unlink
+ */
+static inline struct sk_buff *tipc_skb_dequeue(struct sk_buff_head *list,
+ u32 dport)
+{
+ struct sk_buff *_skb, *tmp, *skb = NULL;
+
+ spin_lock_bh(&list->lock);
+ skb_queue_walk_safe(list, _skb, tmp) {
+ if (msg_destport(buf_msg(_skb)) == dport) {
+ __skb_unlink(_skb, list);
+ skb = _skb;
+ break;
+ }
+ }
+ spin_unlock_bh(&list->lock);
+ return skb;
+}
+
+/* tipc_skb_queue_tail(): add buffer to tail of list;
+ * @list: list to be appended to
+ * @skb: buffer to append. Always appended
+ * @dport: the destination port of the buffer
+ * returns true if dport differs from previous destination
+ */
+static inline bool tipc_skb_queue_tail(struct sk_buff_head *list,
+ struct sk_buff *skb, u32 dport)
+{
+ struct sk_buff *_skb = NULL;
+ bool rv = false;
+
+ spin_lock_bh(&list->lock);
+ _skb = skb_peek_tail(list);
+ if (!_skb || (msg_destport(buf_msg(_skb)) != dport) ||
+ (skb_queue_len(list) > 32))
+ rv = true;
+ __skb_queue_tail(list, skb);
+ spin_unlock_bh(&list->lock);
+ return rv;
+}
+
#endif
}
/**
- * tipc_named_rcv - process name table update message sent by another node
+ * tipc_named_rcv - process name table update messages sent by another node
*/
-void tipc_named_rcv(struct net *net, struct sk_buff *buf)
+void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
- struct tipc_msg *msg = buf_msg(buf);
- struct distr_item *item = (struct distr_item *)msg_data(msg);
- u32 count = msg_data_sz(msg) / ITEM_SIZE;
- u32 node = msg_orignode(msg);
+ struct tipc_msg *msg;
+ struct distr_item *item;
+ uint count;
+ u32 node;
+ struct sk_buff *skb;
+ int mtype;
spin_lock_bh(&tn->nametbl_lock);
- while (count--) {
- if (!tipc_update_nametbl(net, item, node, msg_type(msg)))
- tipc_named_add_backlog(item, msg_type(msg), node);
- item++;
+ for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
+ msg = buf_msg(skb);
+ mtype = msg_type(msg);
+ item = (struct distr_item *)msg_data(msg);
+ count = msg_data_sz(msg) / ITEM_SIZE;
+ node = msg_orignode(msg);
+ while (count--) {
+ if (!tipc_update_nametbl(net, item, node, mtype))
+ tipc_named_add_backlog(item, mtype, node);
+ item++;
+ }
+ kfree_skb(skb);
+ tipc_named_process_backlog(net);
}
- tipc_named_process_backlog(net);
spin_unlock_bh(&tn->nametbl_lock);
- kfree_skb(buf);
}
/**
struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ);
void named_cluster_distribute(struct net *net, struct sk_buff *buf);
void tipc_named_node_up(struct net *net, u32 dnode);
-void tipc_named_rcv(struct net *net, struct sk_buff *buf);
+void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue);
void tipc_named_reinit(struct net *net);
void tipc_named_process_backlog(struct net *net);
void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr);
INIT_LIST_HEAD(&n_ptr->list);
INIT_LIST_HEAD(&n_ptr->publ_list);
INIT_LIST_HEAD(&n_ptr->conn_sks);
- skb_queue_head_init(&n_ptr->waiting_sks);
__skb_queue_head_init(&n_ptr->bclink.deferred_queue);
-
hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
-
list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
if (n_ptr->addr < temp_node->addr)
break;
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_sock_conn *conn, *safe;
- struct sk_buff *buf;
+ struct sk_buff *skb;
+ struct sk_buff_head skbs;
+ skb_queue_head_init(&skbs);
list_for_each_entry_safe(conn, safe, conns, list) {
- buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
+ skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
TIPC_CONN_MSG, SHORT_H_SIZE, 0,
tn->own_addr, conn->peer_node,
conn->port, conn->peer_port,
TIPC_ERR_NO_NODE);
- if (likely(buf))
- tipc_sk_rcv(net, buf);
+ if (likely(skb))
+ skb_queue_tail(&skbs, skb);
list_del(&conn->list);
kfree(conn);
}
+ tipc_sk_rcv(net, &skbs);
}
/**
struct net *net = node->net;
LIST_HEAD(nsub_list);
LIST_HEAD(conn_sks);
- struct sk_buff_head waiting_sks;
u32 addr = 0;
- int flags = node->action_flags;
+ u32 flags = node->action_flags;
u32 link_id = 0;
+ struct sk_buff_head *inputq = node->inputq;
+ struct sk_buff_head *namedq = node->inputq;
- if (likely(!flags)) {
+ if (likely(!flags || (flags == TIPC_MSG_EVT))) {
+ node->action_flags = 0;
spin_unlock_bh(&node->lock);
+ if (flags == TIPC_MSG_EVT)
+ tipc_sk_rcv(net, inputq);
return;
}
addr = node->addr;
link_id = node->link_id;
- __skb_queue_head_init(&waiting_sks);
-
- if (flags & TIPC_WAKEUP_USERS)
- skb_queue_splice_init(&node->waiting_sks, &waiting_sks);
+ namedq = node->namedq;
if (flags & TIPC_NOTIFY_NODE_DOWN) {
list_replace_init(&node->publ_list, &nsub_list);
list_replace_init(&node->conn_sks, &conn_sks);
}
- node->action_flags &= ~(TIPC_WAKEUP_USERS | TIPC_NOTIFY_NODE_DOWN |
+ node->action_flags &= ~(TIPC_MSG_EVT | TIPC_NOTIFY_NODE_DOWN |
TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_UP |
TIPC_NOTIFY_LINK_DOWN |
- TIPC_WAKEUP_BCAST_USERS);
+ TIPC_WAKEUP_BCAST_USERS |
+ TIPC_NAMED_MSG_EVT);
spin_unlock_bh(&node->lock);
- while (!skb_queue_empty(&waiting_sks))
- tipc_sk_rcv(net, __skb_dequeue(&waiting_sks));
-
if (!list_empty(&conn_sks))
tipc_node_abort_sock_conns(net, &conn_sks);
if (flags & TIPC_NOTIFY_LINK_DOWN)
tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
link_id, addr);
+
+ if (flags & TIPC_MSG_EVT)
+ tipc_sk_rcv(net, inputq);
+
+ if (flags & TIPC_NAMED_MSG_EVT)
+ tipc_named_rcv(net, namedq);
}
/* Caller should hold node lock for the passed node */
* TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type
*/
enum {
+ TIPC_MSG_EVT = 1,
TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1),
TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2),
TIPC_NOTIFY_NODE_DOWN = (1 << 3),
TIPC_NOTIFY_NODE_UP = (1 << 4),
- TIPC_WAKEUP_USERS = (1 << 5),
- TIPC_WAKEUP_BCAST_USERS = (1 << 6),
- TIPC_NOTIFY_LINK_UP = (1 << 7),
- TIPC_NOTIFY_LINK_DOWN = (1 << 8)
+ TIPC_WAKEUP_BCAST_USERS = (1 << 5),
+ TIPC_NOTIFY_LINK_UP = (1 << 6),
+ TIPC_NOTIFY_LINK_DOWN = (1 << 7),
+ TIPC_NAMED_MSG_EVT = (1 << 8)
};
/**
* @lock: spinlock governing access to structure
* @net: the applicable net namespace
* @hash: links to adjacent nodes in unsorted hash chain
+ * @inputq: pointer to input queue containing messages for msg event
+ * @namedq: pointer to name table input queue with name table messages
+ * @curr_link: the link holding the node lock, if any
* @active_links: pointers to active links to node
* @links: pointers to all links to node
* @action_flags: bit mask of different types of node actions
spinlock_t lock;
struct net *net;
struct hlist_node hash;
+ struct sk_buff_head *inputq;
+ struct sk_buff_head *namedq;
struct tipc_link *active_links[2];
u32 act_mtus[2];
struct tipc_link *links[MAX_BEARERS];
- unsigned int action_flags;
+ int action_flags;
struct tipc_node_bclink bclink;
struct list_head list;
int link_cnt;
u32 signature;
u32 link_id;
struct list_head publ_list;
- struct sk_buff_head waiting_sks;
struct list_head conn_sks;
struct rcu_head rcu;
};
#include "node.h"
#include "link.h"
#include "config.h"
+#include "name_distr.h"
#include "socket.h"
#define SS_LISTENING -1 /* socket is listening */
struct sk_buff *b;
uint i, last, dst = 0;
u32 scope = TIPC_CLUSTER_SCOPE;
+ struct sk_buff_head msgs;
if (in_own_node(net, msg_orignode(msg)))
scope = TIPC_NODE_SCOPE;
+ if (unlikely(!msg_mcast(msg))) {
+ pr_warn("Received non-multicast msg in multicast\n");
+ kfree_skb(buf);
+ goto exit;
+ }
/* Create destination port list: */
tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg),
msg_nameupper(msg), scope, &dports);
continue;
}
msg_set_destport(msg, item->ports[i]);
- tipc_sk_rcv(net, b);
+ skb_queue_head_init(&msgs);
+ skb_queue_tail(&msgs, b);
+ tipc_sk_rcv(net, &msgs);
}
}
+exit:
tipc_port_list_free(&dports);
}
}
/**
- * tipc_sk_enqueue_skb - enqueue buffer to socket or backlog queue
- * @sk: socket
- * @skb: pointer to message. Set to NULL if buffer is consumed.
- * @dnode: if buffer should be forwarded/returned, send to this node
+ * tipc_sk_enqueue - extract all buffers with destination 'dport' from
+ * inputq and try adding them to socket or backlog queue
+ * @inputq: list of incoming buffers with potentially different destinations
+ * @sk: socket where the buffers should be enqueued
+ * @dport: port number for the socket
+ * @_skb: returned buffer to be forwarded or rejected, if applicable
*
* Caller must hold socket lock
*
- * Returns TIPC_OK (0) or -tipc error code
+ * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
+ * or -TIPC_ERR_NO_PORT
*/
-static int tipc_sk_enqueue_skb(struct sock *sk, struct sk_buff **skb)
+static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
+ u32 dport, struct sk_buff **_skb)
{
unsigned int lim;
atomic_t *dcnt;
-
- if (unlikely(!*skb))
- return TIPC_OK;
- if (!sock_owned_by_user(sk))
- return filter_rcv(sk, skb);
- dcnt = &tipc_sk(sk)->dupl_rcvcnt;
- if (sk->sk_backlog.len)
- atomic_set(dcnt, 0);
- lim = rcvbuf_limit(sk, *skb) + atomic_read(dcnt);
- if (unlikely(sk_add_backlog(sk, *skb, lim)))
+ int err;
+ struct sk_buff *skb;
+ unsigned long time_limit = jiffies + 2;
+
+ while (skb_queue_len(inputq)) {
+ skb = tipc_skb_dequeue(inputq, dport);
+ if (unlikely(!skb))
+ return TIPC_OK;
+ /* Return if softirq window exhausted */
+ if (unlikely(time_after_eq(jiffies, time_limit)))
+ return TIPC_OK;
+ if (!sock_owned_by_user(sk)) {
+ err = filter_rcv(sk, &skb);
+ if (likely(!skb))
+ continue;
+ *_skb = skb;
+ return err;
+ }
+ dcnt = &tipc_sk(sk)->dupl_rcvcnt;
+ if (sk->sk_backlog.len)
+ atomic_set(dcnt, 0);
+ lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
+ if (likely(!sk_add_backlog(sk, skb, lim)))
+ continue;
+ *_skb = skb;
return -TIPC_ERR_OVERLOAD;
- *skb = NULL;
+ }
return TIPC_OK;
}
/**
- * tipc_sk_rcv - handle incoming message
- * @skb: buffer containing arriving message
- * Consumes buffer
- * Returns 0 if success, or errno: -EHOSTUNREACH
+ * tipc_sk_rcv - handle a chain of incoming buffers
+ * @inputq: buffer list containing the buffers
+ * Consumes all buffers in list until inputq is empty
+ * Note: may be called in multiple threads referring to the same queue
+ * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
+ * Only node local calls check the return value, sending single-buffer queues
*/
-int tipc_sk_rcv(struct net *net, struct sk_buff *skb)
+int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
{
+ u32 dnode, dport = 0;
+ int err = -TIPC_ERR_NO_PORT;
+ struct sk_buff *skb;
struct tipc_sock *tsk;
struct tipc_net *tn;
struct sock *sk;
- u32 dport = msg_destport(buf_msg(skb));
- int err = -TIPC_ERR_NO_PORT;
- u32 dnode;
- /* Find destination */
- tsk = tipc_sk_lookup(net, dport);
- if (likely(tsk)) {
- sk = &tsk->sk;
- spin_lock_bh(&sk->sk_lock.slock);
- err = tipc_sk_enqueue_skb(sk, &skb);
- spin_unlock_bh(&sk->sk_lock.slock);
- sock_put(sk);
- }
- if (likely(!skb))
- return 0;
- if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
- goto xmit;
- if (!err) {
- dnode = msg_destnode(buf_msg(skb));
- goto xmit;
- }
- tn = net_generic(net, tipc_net_id);
- if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
- return -EHOSTUNREACH;
+ while (skb_queue_len(inputq)) {
+ skb = NULL;
+ dport = tipc_skb_peek_port(inputq, dport);
+ tsk = tipc_sk_lookup(net, dport);
+ if (likely(tsk)) {
+ sk = &tsk->sk;
+ if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
+ err = tipc_sk_enqueue(inputq, sk, dport, &skb);
+ spin_unlock_bh(&sk->sk_lock.slock);
+ dport = 0;
+ }
+ sock_put(sk);
+ } else {
+ skb = tipc_skb_dequeue(inputq, dport);
+ }
+ if (likely(!skb))
+ continue;
+ if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
+ goto xmit;
+ if (!err) {
+ dnode = msg_destnode(buf_msg(skb));
+ goto xmit;
+ }
+ tn = net_generic(net, tipc_net_id);
+ if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
+ continue;
xmit:
- tipc_link_xmit_skb(net, skb, dnode, dport);
+ tipc_link_xmit_skb(net, skb, dnode, dport);
+ }
return err ? -EHOSTUNREACH : 0;
}
void tipc_sock_release_local(struct socket *sock);
int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
int flags);
-int tipc_sk_rcv(struct net *net, struct sk_buff *buf);
+int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
struct sk_buff *tipc_sk_socks_show(struct net *net);
void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf);
void tipc_sk_reinit(struct net *net);