]> git.karo-electronics.de Git - karo-tx-linux.git/blob - net/tipc/socket.c
x86/cpu: Drop wp_works_ok member of struct cpuinfo_x86
[karo-tx-linux.git] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2016, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include <linux/rhashtable.h>
38 #include <linux/sched/signal.h>
39
40 #include "core.h"
41 #include "name_table.h"
42 #include "node.h"
43 #include "link.h"
44 #include "name_distr.h"
45 #include "socket.h"
46 #include "bcast.h"
47 #include "netlink.h"
48
49 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
50 #define CONN_PROBING_INTERVAL   msecs_to_jiffies(3600000)  /* [ms] => 1 h */
51 #define TIPC_FWD_MSG            1
52 #define TIPC_MAX_PORT           0xffffffff
53 #define TIPC_MIN_PORT           1
54
55 enum {
56         TIPC_LISTEN = TCP_LISTEN,
57         TIPC_ESTABLISHED = TCP_ESTABLISHED,
58         TIPC_OPEN = TCP_CLOSE,
59         TIPC_DISCONNECTING = TCP_CLOSE_WAIT,
60         TIPC_CONNECTING = TCP_SYN_SENT,
61 };
62
63 /**
64  * struct tipc_sock - TIPC socket structure
65  * @sk: socket - interacts with 'port' and with user via the socket API
66  * @conn_type: TIPC type used when connection was established
67  * @conn_instance: TIPC instance used when connection was established
68  * @published: non-zero if port has one or more associated names
69  * @max_pkt: maximum packet size "hint" used when building messages sent by port
70  * @portid: unique port identity in TIPC socket hash table
71  * @phdr: preformatted message header used when sending messages
72  * #cong_links: list of congested links
73  * @publications: list of publications for port
74  * @blocking_link: address of the congested link we are currently sleeping on
75  * @pub_count: total # of publications port has made during its lifetime
76  * @probing_state:
77  * @conn_timeout: the time we can wait for an unresponded setup request
78  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
79  * @cong_link_cnt: number of congested links
80  * @sent_unacked: # messages sent by socket, and not yet acked by peer
81  * @rcv_unacked: # messages read by user, but not yet acked back to peer
82  * @peer: 'connected' peer for dgram/rdm
83  * @node: hash table node
84  * @mc_method: cookie for use between socket and broadcast layer
85  * @rcu: rcu struct for tipc_sock
86  */
87 struct tipc_sock {
88         struct sock sk;
89         u32 conn_type;
90         u32 conn_instance;
91         int published;
92         u32 max_pkt;
93         u32 portid;
94         struct tipc_msg phdr;
95         struct list_head cong_links;
96         struct list_head publications;
97         u32 pub_count;
98         uint conn_timeout;
99         atomic_t dupl_rcvcnt;
100         bool probe_unacked;
101         u16 cong_link_cnt;
102         u16 snt_unacked;
103         u16 snd_win;
104         u16 peer_caps;
105         u16 rcv_unacked;
106         u16 rcv_win;
107         struct sockaddr_tipc peer;
108         struct rhash_head node;
109         struct tipc_mc_method mc_method;
110         struct rcu_head rcu;
111 };
112
113 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
114 static void tipc_data_ready(struct sock *sk);
115 static void tipc_write_space(struct sock *sk);
116 static void tipc_sock_destruct(struct sock *sk);
117 static int tipc_release(struct socket *sock);
118 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
119 static void tipc_sk_timeout(unsigned long data);
120 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
121                            struct tipc_name_seq const *seq);
122 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
123                             struct tipc_name_seq const *seq);
124 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
125 static int tipc_sk_insert(struct tipc_sock *tsk);
126 static void tipc_sk_remove(struct tipc_sock *tsk);
127 static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
128 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
129
130 static const struct proto_ops packet_ops;
131 static const struct proto_ops stream_ops;
132 static const struct proto_ops msg_ops;
133 static struct proto tipc_proto;
134 static const struct rhashtable_params tsk_rht_params;
135
136 static u32 tsk_own_node(struct tipc_sock *tsk)
137 {
138         return msg_prevnode(&tsk->phdr);
139 }
140
141 static u32 tsk_peer_node(struct tipc_sock *tsk)
142 {
143         return msg_destnode(&tsk->phdr);
144 }
145
146 static u32 tsk_peer_port(struct tipc_sock *tsk)
147 {
148         return msg_destport(&tsk->phdr);
149 }
150
151 static  bool tsk_unreliable(struct tipc_sock *tsk)
152 {
153         return msg_src_droppable(&tsk->phdr) != 0;
154 }
155
156 static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
157 {
158         msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
159 }
160
161 static bool tsk_unreturnable(struct tipc_sock *tsk)
162 {
163         return msg_dest_droppable(&tsk->phdr) != 0;
164 }
165
166 static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
167 {
168         msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
169 }
170
171 static int tsk_importance(struct tipc_sock *tsk)
172 {
173         return msg_importance(&tsk->phdr);
174 }
175
176 static int tsk_set_importance(struct tipc_sock *tsk, int imp)
177 {
178         if (imp > TIPC_CRITICAL_IMPORTANCE)
179                 return -EINVAL;
180         msg_set_importance(&tsk->phdr, (u32)imp);
181         return 0;
182 }
183
184 static struct tipc_sock *tipc_sk(const struct sock *sk)
185 {
186         return container_of(sk, struct tipc_sock, sk);
187 }
188
189 static bool tsk_conn_cong(struct tipc_sock *tsk)
190 {
191         return tsk->snt_unacked > tsk->snd_win;
192 }
193
194 /* tsk_blocks(): translate a buffer size in bytes to number of
195  * advertisable blocks, taking into account the ratio truesize(len)/len
196  * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
197  */
198 static u16 tsk_adv_blocks(int len)
199 {
200         return len / FLOWCTL_BLK_SZ / 4;
201 }
202
203 /* tsk_inc(): increment counter for sent or received data
204  * - If block based flow control is not supported by peer we
205  *   fall back to message based ditto, incrementing the counter
206  */
207 static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
208 {
209         if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
210                 return ((msglen / FLOWCTL_BLK_SZ) + 1);
211         return 1;
212 }
213
214 /**
215  * tsk_advance_rx_queue - discard first buffer in socket receive queue
216  *
217  * Caller must hold socket lock
218  */
219 static void tsk_advance_rx_queue(struct sock *sk)
220 {
221         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
222 }
223
224 /* tipc_sk_respond() : send response message back to sender
225  */
226 static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
227 {
228         u32 selector;
229         u32 dnode;
230         u32 onode = tipc_own_addr(sock_net(sk));
231
232         if (!tipc_msg_reverse(onode, &skb, err))
233                 return;
234
235         dnode = msg_destnode(buf_msg(skb));
236         selector = msg_origport(buf_msg(skb));
237         tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
238 }
239
240 /**
241  * tsk_rej_rx_queue - reject all buffers in socket receive queue
242  *
243  * Caller must hold socket lock
244  */
245 static void tsk_rej_rx_queue(struct sock *sk)
246 {
247         struct sk_buff *skb;
248
249         while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
250                 tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
251 }
252
253 static bool tipc_sk_connected(struct sock *sk)
254 {
255         return sk->sk_state == TIPC_ESTABLISHED;
256 }
257
258 /* tipc_sk_type_connectionless - check if the socket is datagram socket
259  * @sk: socket
260  *
261  * Returns true if connection less, false otherwise
262  */
263 static bool tipc_sk_type_connectionless(struct sock *sk)
264 {
265         return sk->sk_type == SOCK_RDM || sk->sk_type == SOCK_DGRAM;
266 }
267
268 /* tsk_peer_msg - verify if message was sent by connected port's peer
269  *
270  * Handles cases where the node's network address has changed from
271  * the default of <0.0.0> to its configured setting.
272  */
273 static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
274 {
275         struct sock *sk = &tsk->sk;
276         struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
277         u32 peer_port = tsk_peer_port(tsk);
278         u32 orig_node;
279         u32 peer_node;
280
281         if (unlikely(!tipc_sk_connected(sk)))
282                 return false;
283
284         if (unlikely(msg_origport(msg) != peer_port))
285                 return false;
286
287         orig_node = msg_orignode(msg);
288         peer_node = tsk_peer_node(tsk);
289
290         if (likely(orig_node == peer_node))
291                 return true;
292
293         if (!orig_node && (peer_node == tn->own_addr))
294                 return true;
295
296         if (!peer_node && (orig_node == tn->own_addr))
297                 return true;
298
299         return false;
300 }
301
302 /* tipc_set_sk_state - set the sk_state of the socket
303  * @sk: socket
304  *
305  * Caller must hold socket lock
306  *
307  * Returns 0 on success, errno otherwise
308  */
309 static int tipc_set_sk_state(struct sock *sk, int state)
310 {
311         int oldsk_state = sk->sk_state;
312         int res = -EINVAL;
313
314         switch (state) {
315         case TIPC_OPEN:
316                 res = 0;
317                 break;
318         case TIPC_LISTEN:
319         case TIPC_CONNECTING:
320                 if (oldsk_state == TIPC_OPEN)
321                         res = 0;
322                 break;
323         case TIPC_ESTABLISHED:
324                 if (oldsk_state == TIPC_CONNECTING ||
325                     oldsk_state == TIPC_OPEN)
326                         res = 0;
327                 break;
328         case TIPC_DISCONNECTING:
329                 if (oldsk_state == TIPC_CONNECTING ||
330                     oldsk_state == TIPC_ESTABLISHED)
331                         res = 0;
332                 break;
333         }
334
335         if (!res)
336                 sk->sk_state = state;
337
338         return res;
339 }
340
341 static int tipc_sk_sock_err(struct socket *sock, long *timeout)
342 {
343         struct sock *sk = sock->sk;
344         int err = sock_error(sk);
345         int typ = sock->type;
346
347         if (err)
348                 return err;
349         if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) {
350                 if (sk->sk_state == TIPC_DISCONNECTING)
351                         return -EPIPE;
352                 else if (!tipc_sk_connected(sk))
353                         return -ENOTCONN;
354         }
355         if (!*timeout)
356                 return -EAGAIN;
357         if (signal_pending(current))
358                 return sock_intr_errno(*timeout);
359
360         return 0;
361 }
362
363 #define tipc_wait_for_cond(sock_, timeout_, condition_)                 \
364 ({                                                                      \
365         int rc_ = 0;                                                    \
366         int done_ = 0;                                                  \
367                                                                         \
368         while (!(condition_) && !done_) {                               \
369                 struct sock *sk_ = sock->sk;                            \
370                 DEFINE_WAIT_FUNC(wait_, woken_wake_function);           \
371                                                                         \
372                 rc_ = tipc_sk_sock_err(sock_, timeout_);                \
373                 if (rc_)                                                \
374                         break;                                          \
375                 prepare_to_wait(sk_sleep(sk_), &wait_,                  \
376                                 TASK_INTERRUPTIBLE);                    \
377                 done_ = sk_wait_event(sk_, timeout_,                    \
378                                       (condition_), &wait_);            \
379                 remove_wait_queue(sk_sleep(sk_), &wait_);               \
380         }                                                               \
381         rc_;                                                            \
382 })
383
384 /**
385  * tipc_sk_create - create a TIPC socket
386  * @net: network namespace (must be default network)
387  * @sock: pre-allocated socket structure
388  * @protocol: protocol indicator (must be 0)
389  * @kern: caused by kernel or by userspace?
390  *
391  * This routine creates additional data structures used by the TIPC socket,
392  * initializes them, and links them together.
393  *
394  * Returns 0 on success, errno otherwise
395  */
396 static int tipc_sk_create(struct net *net, struct socket *sock,
397                           int protocol, int kern)
398 {
399         struct tipc_net *tn;
400         const struct proto_ops *ops;
401         struct sock *sk;
402         struct tipc_sock *tsk;
403         struct tipc_msg *msg;
404
405         /* Validate arguments */
406         if (unlikely(protocol != 0))
407                 return -EPROTONOSUPPORT;
408
409         switch (sock->type) {
410         case SOCK_STREAM:
411                 ops = &stream_ops;
412                 break;
413         case SOCK_SEQPACKET:
414                 ops = &packet_ops;
415                 break;
416         case SOCK_DGRAM:
417         case SOCK_RDM:
418                 ops = &msg_ops;
419                 break;
420         default:
421                 return -EPROTOTYPE;
422         }
423
424         /* Allocate socket's protocol area */
425         sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto, kern);
426         if (sk == NULL)
427                 return -ENOMEM;
428
429         tsk = tipc_sk(sk);
430         tsk->max_pkt = MAX_PKT_DEFAULT;
431         INIT_LIST_HEAD(&tsk->publications);
432         INIT_LIST_HEAD(&tsk->cong_links);
433         msg = &tsk->phdr;
434         tn = net_generic(sock_net(sk), tipc_net_id);
435
436         /* Finish initializing socket data structures */
437         sock->ops = ops;
438         sock_init_data(sock, sk);
439         tipc_set_sk_state(sk, TIPC_OPEN);
440         if (tipc_sk_insert(tsk)) {
441                 pr_warn("Socket create failed; port number exhausted\n");
442                 return -EINVAL;
443         }
444
445         /* Ensure tsk is visible before we read own_addr. */
446         smp_mb();
447
448         tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
449                       NAMED_H_SIZE, 0);
450
451         msg_set_origport(msg, tsk->portid);
452         setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
453         sk->sk_shutdown = 0;
454         sk->sk_backlog_rcv = tipc_backlog_rcv;
455         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
456         sk->sk_data_ready = tipc_data_ready;
457         sk->sk_write_space = tipc_write_space;
458         sk->sk_destruct = tipc_sock_destruct;
459         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
460         atomic_set(&tsk->dupl_rcvcnt, 0);
461
462         /* Start out with safe limits until we receive an advertised window */
463         tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
464         tsk->rcv_win = tsk->snd_win;
465
466         if (tipc_sk_type_connectionless(sk)) {
467                 tsk_set_unreturnable(tsk, true);
468                 if (sock->type == SOCK_DGRAM)
469                         tsk_set_unreliable(tsk, true);
470         }
471
472         return 0;
473 }
474
475 static void tipc_sk_callback(struct rcu_head *head)
476 {
477         struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
478
479         sock_put(&tsk->sk);
480 }
481
482 /* Caller should hold socket lock for the socket. */
483 static void __tipc_shutdown(struct socket *sock, int error)
484 {
485         struct sock *sk = sock->sk;
486         struct tipc_sock *tsk = tipc_sk(sk);
487         struct net *net = sock_net(sk);
488         long timeout = CONN_TIMEOUT_DEFAULT;
489         u32 dnode = tsk_peer_node(tsk);
490         struct sk_buff *skb;
491
492         /* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
493         tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
494                                             !tsk_conn_cong(tsk)));
495
496         /* Reject all unreceived messages, except on an active connection
497          * (which disconnects locally & sends a 'FIN+' to peer).
498          */
499         while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
500                 if (TIPC_SKB_CB(skb)->bytes_read) {
501                         kfree_skb(skb);
502                         continue;
503                 }
504                 if (!tipc_sk_type_connectionless(sk) &&
505                     sk->sk_state != TIPC_DISCONNECTING) {
506                         tipc_set_sk_state(sk, TIPC_DISCONNECTING);
507                         tipc_node_remove_conn(net, dnode, tsk->portid);
508                 }
509                 tipc_sk_respond(sk, skb, error);
510         }
511
512         if (tipc_sk_type_connectionless(sk))
513                 return;
514
515         if (sk->sk_state != TIPC_DISCONNECTING) {
516                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
517                                       TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
518                                       tsk_own_node(tsk), tsk_peer_port(tsk),
519                                       tsk->portid, error);
520                 if (skb)
521                         tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
522                 tipc_node_remove_conn(net, dnode, tsk->portid);
523                 tipc_set_sk_state(sk, TIPC_DISCONNECTING);
524         }
525 }
526
527 /**
528  * tipc_release - destroy a TIPC socket
529  * @sock: socket to destroy
530  *
531  * This routine cleans up any messages that are still queued on the socket.
532  * For DGRAM and RDM socket types, all queued messages are rejected.
533  * For SEQPACKET and STREAM socket types, the first message is rejected
534  * and any others are discarded.  (If the first message on a STREAM socket
535  * is partially-read, it is discarded and the next one is rejected instead.)
536  *
537  * NOTE: Rejected messages are not necessarily returned to the sender!  They
538  * are returned or discarded according to the "destination droppable" setting
539  * specified for the message by the sender.
540  *
541  * Returns 0 on success, errno otherwise
542  */
543 static int tipc_release(struct socket *sock)
544 {
545         struct sock *sk = sock->sk;
546         struct tipc_sock *tsk;
547
548         /*
549          * Exit if socket isn't fully initialized (occurs when a failed accept()
550          * releases a pre-allocated child socket that was never used)
551          */
552         if (sk == NULL)
553                 return 0;
554
555         tsk = tipc_sk(sk);
556         lock_sock(sk);
557
558         __tipc_shutdown(sock, TIPC_ERR_NO_PORT);
559         sk->sk_shutdown = SHUTDOWN_MASK;
560         tipc_sk_withdraw(tsk, 0, NULL);
561         sk_stop_timer(sk, &sk->sk_timer);
562         tipc_sk_remove(tsk);
563
564         /* Reject any messages that accumulated in backlog queue */
565         release_sock(sk);
566         u32_list_purge(&tsk->cong_links);
567         tsk->cong_link_cnt = 0;
568         call_rcu(&tsk->rcu, tipc_sk_callback);
569         sock->sk = NULL;
570
571         return 0;
572 }
573
574 /**
575  * tipc_bind - associate or disassocate TIPC name(s) with a socket
576  * @sock: socket structure
577  * @uaddr: socket address describing name(s) and desired operation
578  * @uaddr_len: size of socket address data structure
579  *
580  * Name and name sequence binding is indicated using a positive scope value;
581  * a negative scope value unbinds the specified name.  Specifying no name
582  * (i.e. a socket address length of 0) unbinds all names from the socket.
583  *
584  * Returns 0 on success, errno otherwise
585  *
586  * NOTE: This routine doesn't need to take the socket lock since it doesn't
587  *       access any non-constant socket information.
588  */
589 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
590                      int uaddr_len)
591 {
592         struct sock *sk = sock->sk;
593         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
594         struct tipc_sock *tsk = tipc_sk(sk);
595         int res = -EINVAL;
596
597         lock_sock(sk);
598         if (unlikely(!uaddr_len)) {
599                 res = tipc_sk_withdraw(tsk, 0, NULL);
600                 goto exit;
601         }
602
603         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
604                 res = -EINVAL;
605                 goto exit;
606         }
607         if (addr->family != AF_TIPC) {
608                 res = -EAFNOSUPPORT;
609                 goto exit;
610         }
611
612         if (addr->addrtype == TIPC_ADDR_NAME)
613                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
614         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
615                 res = -EAFNOSUPPORT;
616                 goto exit;
617         }
618
619         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
620             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
621             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
622                 res = -EACCES;
623                 goto exit;
624         }
625
626         res = (addr->scope > 0) ?
627                 tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
628                 tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
629 exit:
630         release_sock(sk);
631         return res;
632 }
633
634 /**
635  * tipc_getname - get port ID of socket or peer socket
636  * @sock: socket structure
637  * @uaddr: area for returned socket address
638  * @uaddr_len: area for returned length of socket address
639  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
640  *
641  * Returns 0 on success, errno otherwise
642  *
643  * NOTE: This routine doesn't need to take the socket lock since it only
644  *       accesses socket information that is unchanging (or which changes in
645  *       a completely predictable manner).
646  */
647 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
648                         int *uaddr_len, int peer)
649 {
650         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
651         struct sock *sk = sock->sk;
652         struct tipc_sock *tsk = tipc_sk(sk);
653         struct tipc_net *tn = net_generic(sock_net(sock->sk), tipc_net_id);
654
655         memset(addr, 0, sizeof(*addr));
656         if (peer) {
657                 if ((!tipc_sk_connected(sk)) &&
658                     ((peer != 2) || (sk->sk_state != TIPC_DISCONNECTING)))
659                         return -ENOTCONN;
660                 addr->addr.id.ref = tsk_peer_port(tsk);
661                 addr->addr.id.node = tsk_peer_node(tsk);
662         } else {
663                 addr->addr.id.ref = tsk->portid;
664                 addr->addr.id.node = tn->own_addr;
665         }
666
667         *uaddr_len = sizeof(*addr);
668         addr->addrtype = TIPC_ADDR_ID;
669         addr->family = AF_TIPC;
670         addr->scope = 0;
671         addr->addr.name.domain = 0;
672
673         return 0;
674 }
675
676 /**
677  * tipc_poll - read and possibly block on pollmask
678  * @file: file structure associated with the socket
679  * @sock: socket for which to calculate the poll bits
680  * @wait: ???
681  *
682  * Returns pollmask value
683  *
684  * COMMENTARY:
685  * It appears that the usual socket locking mechanisms are not useful here
686  * since the pollmask info is potentially out-of-date the moment this routine
687  * exits.  TCP and other protocols seem to rely on higher level poll routines
688  * to handle any preventable race conditions, so TIPC will do the same ...
689  *
690  * IMPORTANT: The fact that a read or write operation is indicated does NOT
691  * imply that the operation will succeed, merely that it should be performed
692  * and will not block.
693  */
694 static unsigned int tipc_poll(struct file *file, struct socket *sock,
695                               poll_table *wait)
696 {
697         struct sock *sk = sock->sk;
698         struct tipc_sock *tsk = tipc_sk(sk);
699         u32 mask = 0;
700
701         sock_poll_wait(file, sk_sleep(sk), wait);
702
703         if (sk->sk_shutdown & RCV_SHUTDOWN)
704                 mask |= POLLRDHUP | POLLIN | POLLRDNORM;
705         if (sk->sk_shutdown == SHUTDOWN_MASK)
706                 mask |= POLLHUP;
707
708         switch (sk->sk_state) {
709         case TIPC_ESTABLISHED:
710                 if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
711                         mask |= POLLOUT;
712                 /* fall thru' */
713         case TIPC_LISTEN:
714         case TIPC_CONNECTING:
715                 if (!skb_queue_empty(&sk->sk_receive_queue))
716                         mask |= (POLLIN | POLLRDNORM);
717                 break;
718         case TIPC_OPEN:
719                 if (!tsk->cong_link_cnt)
720                         mask |= POLLOUT;
721                 if (tipc_sk_type_connectionless(sk) &&
722                     (!skb_queue_empty(&sk->sk_receive_queue)))
723                         mask |= (POLLIN | POLLRDNORM);
724                 break;
725         case TIPC_DISCONNECTING:
726                 mask = (POLLIN | POLLRDNORM | POLLHUP);
727                 break;
728         }
729
730         return mask;
731 }
732
733 /**
734  * tipc_sendmcast - send multicast message
735  * @sock: socket structure
736  * @seq: destination address
737  * @msg: message to send
738  * @dlen: length of data to send
739  * @timeout: timeout to wait for wakeup
740  *
741  * Called from function tipc_sendmsg(), which has done all sanity checks
742  * Returns the number of bytes sent on success, or errno
743  */
744 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
745                           struct msghdr *msg, size_t dlen, long timeout)
746 {
747         struct sock *sk = sock->sk;
748         struct tipc_sock *tsk = tipc_sk(sk);
749         struct tipc_msg *hdr = &tsk->phdr;
750         struct net *net = sock_net(sk);
751         int mtu = tipc_bcast_get_mtu(net);
752         struct tipc_mc_method *method = &tsk->mc_method;
753         u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
754         struct sk_buff_head pkts;
755         struct tipc_nlist dsts;
756         int rc;
757
758         /* Block or return if any destination link is congested */
759         rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
760         if (unlikely(rc))
761                 return rc;
762
763         /* Lookup destination nodes */
764         tipc_nlist_init(&dsts, tipc_own_addr(net));
765         tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
766                                       seq->upper, domain, &dsts);
767         if (!dsts.local && !dsts.remote)
768                 return -EHOSTUNREACH;
769
770         /* Build message header */
771         msg_set_type(hdr, TIPC_MCAST_MSG);
772         msg_set_hdr_sz(hdr, MCAST_H_SIZE);
773         msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
774         msg_set_destport(hdr, 0);
775         msg_set_destnode(hdr, 0);
776         msg_set_nametype(hdr, seq->type);
777         msg_set_namelower(hdr, seq->lower);
778         msg_set_nameupper(hdr, seq->upper);
779
780         /* Build message as chain of buffers */
781         skb_queue_head_init(&pkts);
782         rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
783
784         /* Send message if build was successful */
785         if (unlikely(rc == dlen))
786                 rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
787                                      &tsk->cong_link_cnt);
788
789         tipc_nlist_purge(&dsts);
790
791         return rc ? rc : dlen;
792 }
793
794 /**
795  * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
796  * @arrvq: queue with arriving messages, to be cloned after destination lookup
797  * @inputq: queue with cloned messages, delivered to socket after dest lookup
798  *
799  * Multi-threaded: parallel calls with reference to same queues may occur
800  */
801 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
802                        struct sk_buff_head *inputq)
803 {
804         struct tipc_msg *msg;
805         struct list_head dports;
806         u32 portid;
807         u32 scope = TIPC_CLUSTER_SCOPE;
808         struct sk_buff_head tmpq;
809         uint hsz;
810         struct sk_buff *skb, *_skb;
811
812         __skb_queue_head_init(&tmpq);
813         INIT_LIST_HEAD(&dports);
814
815         skb = tipc_skb_peek(arrvq, &inputq->lock);
816         for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
817                 msg = buf_msg(skb);
818                 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
819
820                 if (in_own_node(net, msg_orignode(msg)))
821                         scope = TIPC_NODE_SCOPE;
822
823                 /* Create destination port list and message clones: */
824                 tipc_nametbl_mc_translate(net,
825                                           msg_nametype(msg), msg_namelower(msg),
826                                           msg_nameupper(msg), scope, &dports);
827                 portid = u32_pop(&dports);
828                 for (; portid; portid = u32_pop(&dports)) {
829                         _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
830                         if (_skb) {
831                                 msg_set_destport(buf_msg(_skb), portid);
832                                 __skb_queue_tail(&tmpq, _skb);
833                                 continue;
834                         }
835                         pr_warn("Failed to clone mcast rcv buffer\n");
836                 }
837                 /* Append to inputq if not already done by other thread */
838                 spin_lock_bh(&inputq->lock);
839                 if (skb_peek(arrvq) == skb) {
840                         skb_queue_splice_tail_init(&tmpq, inputq);
841                         kfree_skb(__skb_dequeue(arrvq));
842                 }
843                 spin_unlock_bh(&inputq->lock);
844                 __skb_queue_purge(&tmpq);
845                 kfree_skb(skb);
846         }
847         tipc_sk_rcv(net, inputq);
848 }
849
850 /**
851  * tipc_sk_proto_rcv - receive a connection mng protocol message
852  * @tsk: receiving socket
853  * @skb: pointer to message buffer.
854  */
855 static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
856                               struct sk_buff_head *xmitq)
857 {
858         struct sock *sk = &tsk->sk;
859         u32 onode = tsk_own_node(tsk);
860         struct tipc_msg *hdr = buf_msg(skb);
861         int mtyp = msg_type(hdr);
862         bool conn_cong;
863
864         /* Ignore if connection cannot be validated: */
865         if (!tsk_peer_msg(tsk, hdr))
866                 goto exit;
867
868         tsk->probe_unacked = false;
869
870         if (mtyp == CONN_PROBE) {
871                 msg_set_type(hdr, CONN_PROBE_REPLY);
872                 if (tipc_msg_reverse(onode, &skb, TIPC_OK))
873                         __skb_queue_tail(xmitq, skb);
874                 return;
875         } else if (mtyp == CONN_ACK) {
876                 conn_cong = tsk_conn_cong(tsk);
877                 tsk->snt_unacked -= msg_conn_ack(hdr);
878                 if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
879                         tsk->snd_win = msg_adv_win(hdr);
880                 if (conn_cong)
881                         sk->sk_write_space(sk);
882         } else if (mtyp != CONN_PROBE_REPLY) {
883                 pr_warn("Received unknown CONN_PROTO msg\n");
884         }
885 exit:
886         kfree_skb(skb);
887 }
888
889 /**
890  * tipc_sendmsg - send message in connectionless manner
891  * @sock: socket structure
892  * @m: message to send
893  * @dsz: amount of user data to be sent
894  *
895  * Message must have an destination specified explicitly.
896  * Used for SOCK_RDM and SOCK_DGRAM messages,
897  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
898  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
899  *
900  * Returns the number of bytes sent on success, or errno otherwise
901  */
902 static int tipc_sendmsg(struct socket *sock,
903                         struct msghdr *m, size_t dsz)
904 {
905         struct sock *sk = sock->sk;
906         int ret;
907
908         lock_sock(sk);
909         ret = __tipc_sendmsg(sock, m, dsz);
910         release_sock(sk);
911
912         return ret;
913 }
914
915 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
916 {
917         struct sock *sk = sock->sk;
918         struct net *net = sock_net(sk);
919         struct tipc_sock *tsk = tipc_sk(sk);
920         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
921         long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
922         struct list_head *clinks = &tsk->cong_links;
923         bool syn = !tipc_sk_type_connectionless(sk);
924         struct tipc_msg *hdr = &tsk->phdr;
925         struct tipc_name_seq *seq;
926         struct sk_buff_head pkts;
927         u32 type, inst, domain;
928         u32 dnode, dport;
929         int mtu, rc;
930
931         if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
932                 return -EMSGSIZE;
933
934         if (unlikely(!dest)) {
935                 dest = &tsk->peer;
936                 if (!syn || dest->family != AF_TIPC)
937                         return -EDESTADDRREQ;
938         }
939
940         if (unlikely(m->msg_namelen < sizeof(*dest)))
941                 return -EINVAL;
942
943         if (unlikely(dest->family != AF_TIPC))
944                 return -EINVAL;
945
946         if (unlikely(syn)) {
947                 if (sk->sk_state == TIPC_LISTEN)
948                         return -EPIPE;
949                 if (sk->sk_state != TIPC_OPEN)
950                         return -EISCONN;
951                 if (tsk->published)
952                         return -EOPNOTSUPP;
953                 if (dest->addrtype == TIPC_ADDR_NAME) {
954                         tsk->conn_type = dest->addr.name.name.type;
955                         tsk->conn_instance = dest->addr.name.name.instance;
956                 }
957         }
958
959         seq = &dest->addr.nameseq;
960         if (dest->addrtype == TIPC_ADDR_MCAST)
961                 return tipc_sendmcast(sock, seq, m, dlen, timeout);
962
963         if (dest->addrtype == TIPC_ADDR_NAME) {
964                 type = dest->addr.name.name.type;
965                 inst = dest->addr.name.name.instance;
966                 domain = dest->addr.name.domain;
967                 dnode = domain;
968                 msg_set_type(hdr, TIPC_NAMED_MSG);
969                 msg_set_hdr_sz(hdr, NAMED_H_SIZE);
970                 msg_set_nametype(hdr, type);
971                 msg_set_nameinst(hdr, inst);
972                 msg_set_lookup_scope(hdr, tipc_addr_scope(domain));
973                 dport = tipc_nametbl_translate(net, type, inst, &dnode);
974                 msg_set_destnode(hdr, dnode);
975                 msg_set_destport(hdr, dport);
976                 if (unlikely(!dport && !dnode))
977                         return -EHOSTUNREACH;
978
979         } else if (dest->addrtype == TIPC_ADDR_ID) {
980                 dnode = dest->addr.id.node;
981                 msg_set_type(hdr, TIPC_DIRECT_MSG);
982                 msg_set_lookup_scope(hdr, 0);
983                 msg_set_destnode(hdr, dnode);
984                 msg_set_destport(hdr, dest->addr.id.ref);
985                 msg_set_hdr_sz(hdr, BASIC_H_SIZE);
986         }
987
988         /* Block or return if destination link is congested */
989         rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode));
990         if (unlikely(rc))
991                 return rc;
992
993         skb_queue_head_init(&pkts);
994         mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
995         rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
996         if (unlikely(rc != dlen))
997                 return rc;
998
999         rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
1000         if (unlikely(rc == -ELINKCONG)) {
1001                 u32_push(clinks, dnode);
1002                 tsk->cong_link_cnt++;
1003                 rc = 0;
1004         }
1005
1006         if (unlikely(syn && !rc))
1007                 tipc_set_sk_state(sk, TIPC_CONNECTING);
1008
1009         return rc ? rc : dlen;
1010 }
1011
1012 /**
1013  * tipc_sendstream - send stream-oriented data
1014  * @sock: socket structure
1015  * @m: data to send
1016  * @dsz: total length of data to be transmitted
1017  *
1018  * Used for SOCK_STREAM data.
1019  *
1020  * Returns the number of bytes sent on success (or partial success),
1021  * or errno if no data sent
1022  */
1023 static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
1024 {
1025         struct sock *sk = sock->sk;
1026         int ret;
1027
1028         lock_sock(sk);
1029         ret = __tipc_sendstream(sock, m, dsz);
1030         release_sock(sk);
1031
1032         return ret;
1033 }
1034
1035 static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
1036 {
1037         struct sock *sk = sock->sk;
1038         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1039         long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1040         struct tipc_sock *tsk = tipc_sk(sk);
1041         struct tipc_msg *hdr = &tsk->phdr;
1042         struct net *net = sock_net(sk);
1043         struct sk_buff_head pkts;
1044         u32 dnode = tsk_peer_node(tsk);
1045         int send, sent = 0;
1046         int rc = 0;
1047
1048         skb_queue_head_init(&pkts);
1049
1050         if (unlikely(dlen > INT_MAX))
1051                 return -EMSGSIZE;
1052
1053         /* Handle implicit connection setup */
1054         if (unlikely(dest)) {
1055                 rc = __tipc_sendmsg(sock, m, dlen);
1056                 if (dlen && (dlen == rc))
1057                         tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
1058                 return rc;
1059         }
1060
1061         do {
1062                 rc = tipc_wait_for_cond(sock, &timeout,
1063                                         (!tsk->cong_link_cnt &&
1064                                          !tsk_conn_cong(tsk) &&
1065                                          tipc_sk_connected(sk)));
1066                 if (unlikely(rc))
1067                         break;
1068
1069                 send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
1070                 rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
1071                 if (unlikely(rc != send))
1072                         break;
1073
1074                 rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
1075                 if (unlikely(rc == -ELINKCONG)) {
1076                         tsk->cong_link_cnt = 1;
1077                         rc = 0;
1078                 }
1079                 if (likely(!rc)) {
1080                         tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
1081                         sent += send;
1082                 }
1083         } while (sent < dlen && !rc);
1084
1085         return rc ? rc : sent;
1086 }
1087
1088 /**
1089  * tipc_send_packet - send a connection-oriented message
1090  * @sock: socket structure
1091  * @m: message to send
1092  * @dsz: length of data to be transmitted
1093  *
1094  * Used for SOCK_SEQPACKET messages.
1095  *
1096  * Returns the number of bytes sent on success, or errno otherwise
1097  */
1098 static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1099 {
1100         if (dsz > TIPC_MAX_USER_MSG_SIZE)
1101                 return -EMSGSIZE;
1102
1103         return tipc_sendstream(sock, m, dsz);
1104 }
1105
1106 /* tipc_sk_finish_conn - complete the setup of a connection
1107  */
1108 static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1109                                 u32 peer_node)
1110 {
1111         struct sock *sk = &tsk->sk;
1112         struct net *net = sock_net(sk);
1113         struct tipc_msg *msg = &tsk->phdr;
1114
1115         msg_set_destnode(msg, peer_node);
1116         msg_set_destport(msg, peer_port);
1117         msg_set_type(msg, TIPC_CONN_MSG);
1118         msg_set_lookup_scope(msg, 0);
1119         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1120
1121         sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTERVAL);
1122         tipc_set_sk_state(sk, TIPC_ESTABLISHED);
1123         tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1124         tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
1125         tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
1126         if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1127                 return;
1128
1129         /* Fall back to message based flow control */
1130         tsk->rcv_win = FLOWCTL_MSG_WIN;
1131         tsk->snd_win = FLOWCTL_MSG_WIN;
1132 }
1133
1134 /**
1135  * set_orig_addr - capture sender's address for received message
1136  * @m: descriptor for message info
1137  * @msg: received message header
1138  *
1139  * Note: Address is not captured if not requested by receiver.
1140  */
1141 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
1142 {
1143         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
1144
1145         if (addr) {
1146                 addr->family = AF_TIPC;
1147                 addr->addrtype = TIPC_ADDR_ID;
1148                 memset(&addr->addr, 0, sizeof(addr->addr));
1149                 addr->addr.id.ref = msg_origport(msg);
1150                 addr->addr.id.node = msg_orignode(msg);
1151                 addr->addr.name.domain = 0;     /* could leave uninitialized */
1152                 addr->scope = 0;                /* could leave uninitialized */
1153                 m->msg_namelen = sizeof(struct sockaddr_tipc);
1154         }
1155 }
1156
1157 /**
1158  * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1159  * @m: descriptor for message info
1160  * @msg: received message header
1161  * @tsk: TIPC port associated with message
1162  *
1163  * Note: Ancillary data is not captured if not requested by receiver.
1164  *
1165  * Returns 0 if successful, otherwise errno
1166  */
1167 static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1168                                  struct tipc_sock *tsk)
1169 {
1170         u32 anc_data[3];
1171         u32 err;
1172         u32 dest_type;
1173         int has_name;
1174         int res;
1175
1176         if (likely(m->msg_controllen == 0))
1177                 return 0;
1178
1179         /* Optionally capture errored message object(s) */
1180         err = msg ? msg_errcode(msg) : 0;
1181         if (unlikely(err)) {
1182                 anc_data[0] = err;
1183                 anc_data[1] = msg_data_sz(msg);
1184                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1185                 if (res)
1186                         return res;
1187                 if (anc_data[1]) {
1188                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1189                                        msg_data(msg));
1190                         if (res)
1191                                 return res;
1192                 }
1193         }
1194
1195         /* Optionally capture message destination object */
1196         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1197         switch (dest_type) {
1198         case TIPC_NAMED_MSG:
1199                 has_name = 1;
1200                 anc_data[0] = msg_nametype(msg);
1201                 anc_data[1] = msg_namelower(msg);
1202                 anc_data[2] = msg_namelower(msg);
1203                 break;
1204         case TIPC_MCAST_MSG:
1205                 has_name = 1;
1206                 anc_data[0] = msg_nametype(msg);
1207                 anc_data[1] = msg_namelower(msg);
1208                 anc_data[2] = msg_nameupper(msg);
1209                 break;
1210         case TIPC_CONN_MSG:
1211                 has_name = (tsk->conn_type != 0);
1212                 anc_data[0] = tsk->conn_type;
1213                 anc_data[1] = tsk->conn_instance;
1214                 anc_data[2] = tsk->conn_instance;
1215                 break;
1216         default:
1217                 has_name = 0;
1218         }
1219         if (has_name) {
1220                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1221                 if (res)
1222                         return res;
1223         }
1224
1225         return 0;
1226 }
1227
1228 static void tipc_sk_send_ack(struct tipc_sock *tsk)
1229 {
1230         struct sock *sk = &tsk->sk;
1231         struct net *net = sock_net(sk);
1232         struct sk_buff *skb = NULL;
1233         struct tipc_msg *msg;
1234         u32 peer_port = tsk_peer_port(tsk);
1235         u32 dnode = tsk_peer_node(tsk);
1236
1237         if (!tipc_sk_connected(sk))
1238                 return;
1239         skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1240                               dnode, tsk_own_node(tsk), peer_port,
1241                               tsk->portid, TIPC_OK);
1242         if (!skb)
1243                 return;
1244         msg = buf_msg(skb);
1245         msg_set_conn_ack(msg, tsk->rcv_unacked);
1246         tsk->rcv_unacked = 0;
1247
1248         /* Adjust to and advertize the correct window limit */
1249         if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
1250                 tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
1251                 msg_set_adv_win(msg, tsk->rcv_win);
1252         }
1253         tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1254 }
1255
1256 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1257 {
1258         struct sock *sk = sock->sk;
1259         DEFINE_WAIT(wait);
1260         long timeo = *timeop;
1261         int err;
1262
1263         for (;;) {
1264                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1265                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1266                         if (sk->sk_shutdown & RCV_SHUTDOWN) {
1267                                 err = -ENOTCONN;
1268                                 break;
1269                         }
1270                         release_sock(sk);
1271                         timeo = schedule_timeout(timeo);
1272                         lock_sock(sk);
1273                 }
1274                 err = 0;
1275                 if (!skb_queue_empty(&sk->sk_receive_queue))
1276                         break;
1277                 err = -EAGAIN;
1278                 if (!timeo)
1279                         break;
1280                 err = sock_intr_errno(timeo);
1281                 if (signal_pending(current))
1282                         break;
1283         }
1284         finish_wait(sk_sleep(sk), &wait);
1285         *timeop = timeo;
1286         return err;
1287 }
1288
1289 /**
1290  * tipc_recvmsg - receive packet-oriented message
1291  * @m: descriptor for message info
1292  * @buf_len: total size of user buffer area
1293  * @flags: receive flags
1294  *
1295  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1296  * If the complete message doesn't fit in user area, truncate it.
1297  *
1298  * Returns size of returned message data, errno otherwise
1299  */
1300 static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
1301                         int flags)
1302 {
1303         struct sock *sk = sock->sk;
1304         struct tipc_sock *tsk = tipc_sk(sk);
1305         struct sk_buff *buf;
1306         struct tipc_msg *msg;
1307         bool is_connectionless = tipc_sk_type_connectionless(sk);
1308         long timeo;
1309         unsigned int sz;
1310         u32 err;
1311         int res, hlen;
1312
1313         /* Catch invalid receive requests */
1314         if (unlikely(!buf_len))
1315                 return -EINVAL;
1316
1317         lock_sock(sk);
1318
1319         if (!is_connectionless && unlikely(sk->sk_state == TIPC_OPEN)) {
1320                 res = -ENOTCONN;
1321                 goto exit;
1322         }
1323
1324         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1325 restart:
1326
1327         /* Look for a message in receive queue; wait if necessary */
1328         res = tipc_wait_for_rcvmsg(sock, &timeo);
1329         if (res)
1330                 goto exit;
1331
1332         /* Look at first message in receive queue */
1333         buf = skb_peek(&sk->sk_receive_queue);
1334         msg = buf_msg(buf);
1335         sz = msg_data_sz(msg);
1336         hlen = msg_hdr_sz(msg);
1337         err = msg_errcode(msg);
1338
1339         /* Discard an empty non-errored message & try again */
1340         if ((!sz) && (!err)) {
1341                 tsk_advance_rx_queue(sk);
1342                 goto restart;
1343         }
1344
1345         /* Capture sender's address (optional) */
1346         set_orig_addr(m, msg);
1347
1348         /* Capture ancillary data (optional) */
1349         res = tipc_sk_anc_data_recv(m, msg, tsk);
1350         if (res)
1351                 goto exit;
1352
1353         /* Capture message data (if valid) & compute return value (always) */
1354         if (!err) {
1355                 if (unlikely(buf_len < sz)) {
1356                         sz = buf_len;
1357                         m->msg_flags |= MSG_TRUNC;
1358                 }
1359                 res = skb_copy_datagram_msg(buf, hlen, m, sz);
1360                 if (res)
1361                         goto exit;
1362                 res = sz;
1363         } else {
1364                 if (is_connectionless || err == TIPC_CONN_SHUTDOWN ||
1365                     m->msg_control)
1366                         res = 0;
1367                 else
1368                         res = -ECONNRESET;
1369         }
1370
1371         if (unlikely(flags & MSG_PEEK))
1372                 goto exit;
1373
1374         if (likely(!is_connectionless)) {
1375                 tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
1376                 if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
1377                         tipc_sk_send_ack(tsk);
1378         }
1379         tsk_advance_rx_queue(sk);
1380 exit:
1381         release_sock(sk);
1382         return res;
1383 }
1384
1385 /**
1386  * tipc_recv_stream - receive stream-oriented data
1387  * @m: descriptor for message info
1388  * @buf_len: total size of user buffer area
1389  * @flags: receive flags
1390  *
1391  * Used for SOCK_STREAM messages only.  If not enough data is available
1392  * will optionally wait for more; never truncates data.
1393  *
1394  * Returns size of returned message data, errno otherwise
1395  */
1396 static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
1397                             size_t buf_len, int flags)
1398 {
1399         struct sock *sk = sock->sk;
1400         struct tipc_sock *tsk = tipc_sk(sk);
1401         struct sk_buff *buf;
1402         struct tipc_msg *msg;
1403         long timeo;
1404         unsigned int sz;
1405         int target;
1406         int sz_copied = 0;
1407         u32 err;
1408         int res = 0, hlen;
1409
1410         /* Catch invalid receive attempts */
1411         if (unlikely(!buf_len))
1412                 return -EINVAL;
1413
1414         lock_sock(sk);
1415
1416         if (unlikely(sk->sk_state == TIPC_OPEN)) {
1417                 res = -ENOTCONN;
1418                 goto exit;
1419         }
1420
1421         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1422         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1423
1424 restart:
1425         /* Look for a message in receive queue; wait if necessary */
1426         res = tipc_wait_for_rcvmsg(sock, &timeo);
1427         if (res)
1428                 goto exit;
1429
1430         /* Look at first message in receive queue */
1431         buf = skb_peek(&sk->sk_receive_queue);
1432         msg = buf_msg(buf);
1433         sz = msg_data_sz(msg);
1434         hlen = msg_hdr_sz(msg);
1435         err = msg_errcode(msg);
1436
1437         /* Discard an empty non-errored message & try again */
1438         if ((!sz) && (!err)) {
1439                 tsk_advance_rx_queue(sk);
1440                 goto restart;
1441         }
1442
1443         /* Optionally capture sender's address & ancillary data of first msg */
1444         if (sz_copied == 0) {
1445                 set_orig_addr(m, msg);
1446                 res = tipc_sk_anc_data_recv(m, msg, tsk);
1447                 if (res)
1448                         goto exit;
1449         }
1450
1451         /* Capture message data (if valid) & compute return value (always) */
1452         if (!err) {
1453                 u32 offset = TIPC_SKB_CB(buf)->bytes_read;
1454                 u32 needed;
1455                 int sz_to_copy;
1456
1457                 sz -= offset;
1458                 needed = (buf_len - sz_copied);
1459                 sz_to_copy = min(sz, needed);
1460
1461                 res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);
1462                 if (res)
1463                         goto exit;
1464
1465                 sz_copied += sz_to_copy;
1466
1467                 if (sz_to_copy < sz) {
1468                         if (!(flags & MSG_PEEK))
1469                                 TIPC_SKB_CB(buf)->bytes_read =
1470                                         offset + sz_to_copy;
1471                         goto exit;
1472                 }
1473         } else {
1474                 if (sz_copied != 0)
1475                         goto exit; /* can't add error msg to valid data */
1476
1477                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1478                         res = 0;
1479                 else
1480                         res = -ECONNRESET;
1481         }
1482
1483         if (unlikely(flags & MSG_PEEK))
1484                 goto exit;
1485
1486         tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
1487         if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
1488                 tipc_sk_send_ack(tsk);
1489         tsk_advance_rx_queue(sk);
1490
1491         /* Loop around if more data is required */
1492         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1493             (!skb_queue_empty(&sk->sk_receive_queue) ||
1494             (sz_copied < target)) &&    /* and more is ready or required */
1495             (!err))                     /* and haven't reached a FIN */
1496                 goto restart;
1497
1498 exit:
1499         release_sock(sk);
1500         return sz_copied ? sz_copied : res;
1501 }
1502
1503 /**
1504  * tipc_write_space - wake up thread if port congestion is released
1505  * @sk: socket
1506  */
1507 static void tipc_write_space(struct sock *sk)
1508 {
1509         struct socket_wq *wq;
1510
1511         rcu_read_lock();
1512         wq = rcu_dereference(sk->sk_wq);
1513         if (skwq_has_sleeper(wq))
1514                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1515                                                 POLLWRNORM | POLLWRBAND);
1516         rcu_read_unlock();
1517 }
1518
1519 /**
1520  * tipc_data_ready - wake up threads to indicate messages have been received
1521  * @sk: socket
1522  * @len: the length of messages
1523  */
1524 static void tipc_data_ready(struct sock *sk)
1525 {
1526         struct socket_wq *wq;
1527
1528         rcu_read_lock();
1529         wq = rcu_dereference(sk->sk_wq);
1530         if (skwq_has_sleeper(wq))
1531                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1532                                                 POLLRDNORM | POLLRDBAND);
1533         rcu_read_unlock();
1534 }
1535
1536 static void tipc_sock_destruct(struct sock *sk)
1537 {
1538         __skb_queue_purge(&sk->sk_receive_queue);
1539 }
1540
1541 /**
1542  * filter_connect - Handle all incoming messages for a connection-based socket
1543  * @tsk: TIPC socket
1544  * @skb: pointer to message buffer. Set to NULL if buffer is consumed
1545  *
1546  * Returns true if everything ok, false otherwise
1547  */
1548 static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
1549 {
1550         struct sock *sk = &tsk->sk;
1551         struct net *net = sock_net(sk);
1552         struct tipc_msg *hdr = buf_msg(skb);
1553
1554         if (unlikely(msg_mcast(hdr)))
1555                 return false;
1556
1557         switch (sk->sk_state) {
1558         case TIPC_CONNECTING:
1559                 /* Accept only ACK or NACK message */
1560                 if (unlikely(!msg_connected(hdr)))
1561                         return false;
1562
1563                 if (unlikely(msg_errcode(hdr))) {
1564                         tipc_set_sk_state(sk, TIPC_DISCONNECTING);
1565                         sk->sk_err = ECONNREFUSED;
1566                         return true;
1567                 }
1568
1569                 if (unlikely(!msg_isdata(hdr))) {
1570                         tipc_set_sk_state(sk, TIPC_DISCONNECTING);
1571                         sk->sk_err = EINVAL;
1572                         return true;
1573                 }
1574
1575                 tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr));
1576                 msg_set_importance(&tsk->phdr, msg_importance(hdr));
1577
1578                 /* If 'ACK+' message, add to socket receive queue */
1579                 if (msg_data_sz(hdr))
1580                         return true;
1581
1582                 /* If empty 'ACK-' message, wake up sleeping connect() */
1583                 if (waitqueue_active(sk_sleep(sk)))
1584                         wake_up_interruptible(sk_sleep(sk));
1585
1586                 /* 'ACK-' message is neither accepted nor rejected: */
1587                 msg_set_dest_droppable(hdr, 1);
1588                 return false;
1589
1590         case TIPC_OPEN:
1591         case TIPC_DISCONNECTING:
1592                 break;
1593         case TIPC_LISTEN:
1594                 /* Accept only SYN message */
1595                 if (!msg_connected(hdr) && !(msg_errcode(hdr)))
1596                         return true;
1597                 break;
1598         case TIPC_ESTABLISHED:
1599                 /* Accept only connection-based messages sent by peer */
1600                 if (unlikely(!tsk_peer_msg(tsk, hdr)))
1601                         return false;
1602
1603                 if (unlikely(msg_errcode(hdr))) {
1604                         tipc_set_sk_state(sk, TIPC_DISCONNECTING);
1605                         /* Let timer expire on it's own */
1606                         tipc_node_remove_conn(net, tsk_peer_node(tsk),
1607                                               tsk->portid);
1608                         sk->sk_state_change(sk);
1609                 }
1610                 return true;
1611         default:
1612                 pr_err("Unknown sk_state %u\n", sk->sk_state);
1613         }
1614
1615         return false;
1616 }
1617
1618 /**
1619  * rcvbuf_limit - get proper overload limit of socket receive queue
1620  * @sk: socket
1621  * @skb: message
1622  *
1623  * For connection oriented messages, irrespective of importance,
1624  * default queue limit is 2 MB.
1625  *
1626  * For connectionless messages, queue limits are based on message
1627  * importance as follows:
1628  *
1629  * TIPC_LOW_IMPORTANCE       (2 MB)
1630  * TIPC_MEDIUM_IMPORTANCE    (4 MB)
1631  * TIPC_HIGH_IMPORTANCE      (8 MB)
1632  * TIPC_CRITICAL_IMPORTANCE  (16 MB)
1633  *
1634  * Returns overload limit according to corresponding message importance
1635  */
1636 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
1637 {
1638         struct tipc_sock *tsk = tipc_sk(sk);
1639         struct tipc_msg *hdr = buf_msg(skb);
1640
1641         if (unlikely(!msg_connected(hdr)))
1642                 return sk->sk_rcvbuf << msg_importance(hdr);
1643
1644         if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
1645                 return sk->sk_rcvbuf;
1646
1647         return FLOWCTL_MSG_LIM;
1648 }
1649
1650 /**
1651  * filter_rcv - validate incoming message
1652  * @sk: socket
1653  * @skb: pointer to message.
1654  *
1655  * Enqueues message on receive queue if acceptable; optionally handles
1656  * disconnect indication for a connected socket.
1657  *
1658  * Called with socket lock already taken
1659  *
1660  * Returns true if message was added to socket receive queue, otherwise false
1661  */
1662 static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
1663                        struct sk_buff_head *xmitq)
1664 {
1665         struct tipc_sock *tsk = tipc_sk(sk);
1666         struct tipc_msg *hdr = buf_msg(skb);
1667         unsigned int limit = rcvbuf_limit(sk, skb);
1668         int err = TIPC_OK;
1669         int usr = msg_user(hdr);
1670         u32 onode;
1671
1672         if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
1673                 tipc_sk_proto_rcv(tsk, skb, xmitq);
1674                 return false;
1675         }
1676
1677         if (unlikely(usr == SOCK_WAKEUP)) {
1678                 onode = msg_orignode(hdr);
1679                 kfree_skb(skb);
1680                 u32_del(&tsk->cong_links, onode);
1681                 tsk->cong_link_cnt--;
1682                 sk->sk_write_space(sk);
1683                 return false;
1684         }
1685
1686         /* Drop if illegal message type */
1687         if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) {
1688                 kfree_skb(skb);
1689                 return false;
1690         }
1691
1692         /* Reject if wrong message type for current socket state */
1693         if (tipc_sk_type_connectionless(sk)) {
1694                 if (msg_connected(hdr)) {
1695                         err = TIPC_ERR_NO_PORT;
1696                         goto reject;
1697                 }
1698         } else if (unlikely(!filter_connect(tsk, skb))) {
1699                 err = TIPC_ERR_NO_PORT;
1700                 goto reject;
1701         }
1702
1703         /* Reject message if there isn't room to queue it */
1704         if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) {
1705                 err = TIPC_ERR_OVERLOAD;
1706                 goto reject;
1707         }
1708
1709         /* Enqueue message */
1710         TIPC_SKB_CB(skb)->bytes_read = 0;
1711         __skb_queue_tail(&sk->sk_receive_queue, skb);
1712         skb_set_owner_r(skb, sk);
1713
1714         sk->sk_data_ready(sk);
1715         return true;
1716
1717 reject:
1718         if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
1719                 __skb_queue_tail(xmitq, skb);
1720         return false;
1721 }
1722
1723 /**
1724  * tipc_backlog_rcv - handle incoming message from backlog queue
1725  * @sk: socket
1726  * @skb: message
1727  *
1728  * Caller must hold socket lock
1729  *
1730  * Returns 0
1731  */
1732 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1733 {
1734         unsigned int truesize = skb->truesize;
1735         struct sk_buff_head xmitq;
1736         u32 dnode, selector;
1737
1738         __skb_queue_head_init(&xmitq);
1739
1740         if (likely(filter_rcv(sk, skb, &xmitq))) {
1741                 atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
1742                 return 0;
1743         }
1744
1745         if (skb_queue_empty(&xmitq))
1746                 return 0;
1747
1748         /* Send response/rejected message */
1749         skb = __skb_dequeue(&xmitq);
1750         dnode = msg_destnode(buf_msg(skb));
1751         selector = msg_origport(buf_msg(skb));
1752         tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
1753         return 0;
1754 }
1755
1756 /**
1757  * tipc_sk_enqueue - extract all buffers with destination 'dport' from
1758  *                   inputq and try adding them to socket or backlog queue
1759  * @inputq: list of incoming buffers with potentially different destinations
1760  * @sk: socket where the buffers should be enqueued
1761  * @dport: port number for the socket
1762  *
1763  * Caller must hold socket lock
1764  */
1765 static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1766                             u32 dport, struct sk_buff_head *xmitq)
1767 {
1768         unsigned long time_limit = jiffies + 2;
1769         struct sk_buff *skb;
1770         unsigned int lim;
1771         atomic_t *dcnt;
1772         u32 onode;
1773
1774         while (skb_queue_len(inputq)) {
1775                 if (unlikely(time_after_eq(jiffies, time_limit)))
1776                         return;
1777
1778                 skb = tipc_skb_dequeue(inputq, dport);
1779                 if (unlikely(!skb))
1780                         return;
1781
1782                 /* Add message directly to receive queue if possible */
1783                 if (!sock_owned_by_user(sk)) {
1784                         filter_rcv(sk, skb, xmitq);
1785                         continue;
1786                 }
1787
1788                 /* Try backlog, compensating for double-counted bytes */
1789                 dcnt = &tipc_sk(sk)->dupl_rcvcnt;
1790                 if (!sk->sk_backlog.len)
1791                         atomic_set(dcnt, 0);
1792                 lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
1793                 if (likely(!sk_add_backlog(sk, skb, lim)))
1794                         continue;
1795
1796                 /* Overload => reject message back to sender */
1797                 onode = tipc_own_addr(sock_net(sk));
1798                 if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
1799                         __skb_queue_tail(xmitq, skb);
1800                 break;
1801         }
1802 }
1803
1804 /**
1805  * tipc_sk_rcv - handle a chain of incoming buffers
1806  * @inputq: buffer list containing the buffers
1807  * Consumes all buffers in list until inputq is empty
1808  * Note: may be called in multiple threads referring to the same queue
1809  */
1810 void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1811 {
1812         struct sk_buff_head xmitq;
1813         u32 dnode, dport = 0;
1814         int err;
1815         struct tipc_sock *tsk;
1816         struct sock *sk;
1817         struct sk_buff *skb;
1818
1819         __skb_queue_head_init(&xmitq);
1820         while (skb_queue_len(inputq)) {
1821                 dport = tipc_skb_peek_port(inputq, dport);
1822                 tsk = tipc_sk_lookup(net, dport);
1823
1824                 if (likely(tsk)) {
1825                         sk = &tsk->sk;
1826                         if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
1827                                 tipc_sk_enqueue(inputq, sk, dport, &xmitq);
1828                                 spin_unlock_bh(&sk->sk_lock.slock);
1829                         }
1830                         /* Send pending response/rejected messages, if any */
1831                         while ((skb = __skb_dequeue(&xmitq))) {
1832                                 dnode = msg_destnode(buf_msg(skb));
1833                                 tipc_node_xmit_skb(net, skb, dnode, dport);
1834                         }
1835                         sock_put(sk);
1836                         continue;
1837                 }
1838
1839                 /* No destination socket => dequeue skb if still there */
1840                 skb = tipc_skb_dequeue(inputq, dport);
1841                 if (!skb)
1842                         return;
1843
1844                 /* Try secondary lookup if unresolved named message */
1845                 err = TIPC_ERR_NO_PORT;
1846                 if (tipc_msg_lookup_dest(net, skb, &err))
1847                         goto xmit;
1848
1849                 /* Prepare for message rejection */
1850                 if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
1851                         continue;
1852 xmit:
1853                 dnode = msg_destnode(buf_msg(skb));
1854                 tipc_node_xmit_skb(net, skb, dnode, dport);
1855         }
1856 }
1857
1858 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1859 {
1860         DEFINE_WAIT_FUNC(wait, woken_wake_function);
1861         struct sock *sk = sock->sk;
1862         int done;
1863
1864         do {
1865                 int err = sock_error(sk);
1866                 if (err)
1867                         return err;
1868                 if (!*timeo_p)
1869                         return -ETIMEDOUT;
1870                 if (signal_pending(current))
1871                         return sock_intr_errno(*timeo_p);
1872
1873                 add_wait_queue(sk_sleep(sk), &wait);
1874                 done = sk_wait_event(sk, timeo_p,
1875                                      sk->sk_state != TIPC_CONNECTING, &wait);
1876                 remove_wait_queue(sk_sleep(sk), &wait);
1877         } while (!done);
1878         return 0;
1879 }
1880
1881 /**
1882  * tipc_connect - establish a connection to another TIPC port
1883  * @sock: socket structure
1884  * @dest: socket address for destination port
1885  * @destlen: size of socket address data structure
1886  * @flags: file-related flags associated with socket
1887  *
1888  * Returns 0 on success, errno otherwise
1889  */
1890 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1891                         int destlen, int flags)
1892 {
1893         struct sock *sk = sock->sk;
1894         struct tipc_sock *tsk = tipc_sk(sk);
1895         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1896         struct msghdr m = {NULL,};
1897         long timeout = (flags & O_NONBLOCK) ? 0 : tsk->conn_timeout;
1898         int previous;
1899         int res = 0;
1900
1901         lock_sock(sk);
1902
1903         /* DGRAM/RDM connect(), just save the destaddr */
1904         if (tipc_sk_type_connectionless(sk)) {
1905                 if (dst->family == AF_UNSPEC) {
1906                         memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
1907                 } else if (destlen != sizeof(struct sockaddr_tipc)) {
1908                         res = -EINVAL;
1909                 } else {
1910                         memcpy(&tsk->peer, dest, destlen);
1911                 }
1912                 goto exit;
1913         }
1914
1915         /*
1916          * Reject connection attempt using multicast address
1917          *
1918          * Note: send_msg() validates the rest of the address fields,
1919          *       so there's no need to do it here
1920          */
1921         if (dst->addrtype == TIPC_ADDR_MCAST) {
1922                 res = -EINVAL;
1923                 goto exit;
1924         }
1925
1926         previous = sk->sk_state;
1927
1928         switch (sk->sk_state) {
1929         case TIPC_OPEN:
1930                 /* Send a 'SYN-' to destination */
1931                 m.msg_name = dest;
1932                 m.msg_namelen = destlen;
1933
1934                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1935                  * indicate send_msg() is never blocked.
1936                  */
1937                 if (!timeout)
1938                         m.msg_flags = MSG_DONTWAIT;
1939
1940                 res = __tipc_sendmsg(sock, &m, 0);
1941                 if ((res < 0) && (res != -EWOULDBLOCK))
1942                         goto exit;
1943
1944                 /* Just entered TIPC_CONNECTING state; the only
1945                  * difference is that return value in non-blocking
1946                  * case is EINPROGRESS, rather than EALREADY.
1947                  */
1948                 res = -EINPROGRESS;
1949                 /* fall thru' */
1950         case TIPC_CONNECTING:
1951                 if (!timeout) {
1952                         if (previous == TIPC_CONNECTING)
1953                                 res = -EALREADY;
1954                         goto exit;
1955                 }
1956                 timeout = msecs_to_jiffies(timeout);
1957                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1958                 res = tipc_wait_for_connect(sock, &timeout);
1959                 break;
1960         case TIPC_ESTABLISHED:
1961                 res = -EISCONN;
1962                 break;
1963         default:
1964                 res = -EINVAL;
1965         }
1966
1967 exit:
1968         release_sock(sk);
1969         return res;
1970 }
1971
1972 /**
1973  * tipc_listen - allow socket to listen for incoming connections
1974  * @sock: socket structure
1975  * @len: (unused)
1976  *
1977  * Returns 0 on success, errno otherwise
1978  */
1979 static int tipc_listen(struct socket *sock, int len)
1980 {
1981         struct sock *sk = sock->sk;
1982         int res;
1983
1984         lock_sock(sk);
1985         res = tipc_set_sk_state(sk, TIPC_LISTEN);
1986         release_sock(sk);
1987
1988         return res;
1989 }
1990
1991 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1992 {
1993         struct sock *sk = sock->sk;
1994         DEFINE_WAIT(wait);
1995         int err;
1996
1997         /* True wake-one mechanism for incoming connections: only
1998          * one process gets woken up, not the 'whole herd'.
1999          * Since we do not 'race & poll' for established sockets
2000          * anymore, the common case will execute the loop only once.
2001         */
2002         for (;;) {
2003                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
2004                                           TASK_INTERRUPTIBLE);
2005                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
2006                         release_sock(sk);
2007                         timeo = schedule_timeout(timeo);
2008                         lock_sock(sk);
2009                 }
2010                 err = 0;
2011                 if (!skb_queue_empty(&sk->sk_receive_queue))
2012                         break;
2013                 err = -EAGAIN;
2014                 if (!timeo)
2015                         break;
2016                 err = sock_intr_errno(timeo);
2017                 if (signal_pending(current))
2018                         break;
2019         }
2020         finish_wait(sk_sleep(sk), &wait);
2021         return err;
2022 }
2023
2024 /**
2025  * tipc_accept - wait for connection request
2026  * @sock: listening socket
2027  * @newsock: new socket that is to be connected
2028  * @flags: file-related flags associated with socket
2029  *
2030  * Returns 0 on success, errno otherwise
2031  */
2032 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
2033 {
2034         struct sock *new_sk, *sk = sock->sk;
2035         struct sk_buff *buf;
2036         struct tipc_sock *new_tsock;
2037         struct tipc_msg *msg;
2038         long timeo;
2039         int res;
2040
2041         lock_sock(sk);
2042
2043         if (sk->sk_state != TIPC_LISTEN) {
2044                 res = -EINVAL;
2045                 goto exit;
2046         }
2047         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
2048         res = tipc_wait_for_accept(sock, timeo);
2049         if (res)
2050                 goto exit;
2051
2052         buf = skb_peek(&sk->sk_receive_queue);
2053
2054         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 0);
2055         if (res)
2056                 goto exit;
2057         security_sk_clone(sock->sk, new_sock->sk);
2058
2059         new_sk = new_sock->sk;
2060         new_tsock = tipc_sk(new_sk);
2061         msg = buf_msg(buf);
2062
2063         /* we lock on new_sk; but lockdep sees the lock on sk */
2064         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2065
2066         /*
2067          * Reject any stray messages received by new socket
2068          * before the socket lock was taken (very, very unlikely)
2069          */
2070         tsk_rej_rx_queue(new_sk);
2071
2072         /* Connect new socket to it's peer */
2073         tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2074
2075         tsk_set_importance(new_tsock, msg_importance(msg));
2076         if (msg_named(msg)) {
2077                 new_tsock->conn_type = msg_nametype(msg);
2078                 new_tsock->conn_instance = msg_nameinst(msg);
2079         }
2080
2081         /*
2082          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
2083          * Respond to 'SYN+' by queuing it on new socket.
2084          */
2085         if (!msg_data_sz(msg)) {
2086                 struct msghdr m = {NULL,};
2087
2088                 tsk_advance_rx_queue(sk);
2089                 __tipc_sendstream(new_sock, &m, 0);
2090         } else {
2091                 __skb_dequeue(&sk->sk_receive_queue);
2092                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
2093                 skb_set_owner_r(buf, new_sk);
2094         }
2095         release_sock(new_sk);
2096 exit:
2097         release_sock(sk);
2098         return res;
2099 }
2100
2101 /**
2102  * tipc_shutdown - shutdown socket connection
2103  * @sock: socket structure
2104  * @how: direction to close (must be SHUT_RDWR)
2105  *
2106  * Terminates connection (if necessary), then purges socket's receive queue.
2107  *
2108  * Returns 0 on success, errno otherwise
2109  */
2110 static int tipc_shutdown(struct socket *sock, int how)
2111 {
2112         struct sock *sk = sock->sk;
2113         int res;
2114
2115         if (how != SHUT_RDWR)
2116                 return -EINVAL;
2117
2118         lock_sock(sk);
2119
2120         __tipc_shutdown(sock, TIPC_CONN_SHUTDOWN);
2121         sk->sk_shutdown = SEND_SHUTDOWN;
2122
2123         if (sk->sk_state == TIPC_DISCONNECTING) {
2124                 /* Discard any unreceived messages */
2125                 __skb_queue_purge(&sk->sk_receive_queue);
2126
2127                 /* Wake up anyone sleeping in poll */
2128                 sk->sk_state_change(sk);
2129                 res = 0;
2130         } else {
2131                 res = -ENOTCONN;
2132         }
2133
2134         release_sock(sk);
2135         return res;
2136 }
2137
2138 static void tipc_sk_timeout(unsigned long data)
2139 {
2140         struct tipc_sock *tsk = (struct tipc_sock *)data;
2141         struct sock *sk = &tsk->sk;
2142         struct sk_buff *skb = NULL;
2143         u32 peer_port, peer_node;
2144         u32 own_node = tsk_own_node(tsk);
2145
2146         bh_lock_sock(sk);
2147         if (!tipc_sk_connected(sk)) {
2148                 bh_unlock_sock(sk);
2149                 goto exit;
2150         }
2151         peer_port = tsk_peer_port(tsk);
2152         peer_node = tsk_peer_node(tsk);
2153
2154         if (tsk->probe_unacked) {
2155                 if (!sock_owned_by_user(sk)) {
2156                         tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2157                         tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
2158                                               tsk_peer_port(tsk));
2159                         sk->sk_state_change(sk);
2160                 } else {
2161                         /* Try again later */
2162                         sk_reset_timer(sk, &sk->sk_timer, (HZ / 20));
2163                 }
2164
2165                 bh_unlock_sock(sk);
2166                 goto exit;
2167         }
2168
2169         skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE,
2170                               INT_H_SIZE, 0, peer_node, own_node,
2171                               peer_port, tsk->portid, TIPC_OK);
2172         tsk->probe_unacked = true;
2173         sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTERVAL);
2174         bh_unlock_sock(sk);
2175         if (skb)
2176                 tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
2177 exit:
2178         sock_put(sk);
2179 }
2180
2181 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2182                            struct tipc_name_seq const *seq)
2183 {
2184         struct sock *sk = &tsk->sk;
2185         struct net *net = sock_net(sk);
2186         struct publication *publ;
2187         u32 key;
2188
2189         if (tipc_sk_connected(sk))
2190                 return -EINVAL;
2191         key = tsk->portid + tsk->pub_count + 1;
2192         if (key == tsk->portid)
2193                 return -EADDRINUSE;
2194
2195         publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2196                                     scope, tsk->portid, key);
2197         if (unlikely(!publ))
2198                 return -EINVAL;
2199
2200         list_add(&publ->pport_list, &tsk->publications);
2201         tsk->pub_count++;
2202         tsk->published = 1;
2203         return 0;
2204 }
2205
2206 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2207                             struct tipc_name_seq const *seq)
2208 {
2209         struct net *net = sock_net(&tsk->sk);
2210         struct publication *publ;
2211         struct publication *safe;
2212         int rc = -EINVAL;
2213
2214         list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) {
2215                 if (seq) {
2216                         if (publ->scope != scope)
2217                                 continue;
2218                         if (publ->type != seq->type)
2219                                 continue;
2220                         if (publ->lower != seq->lower)
2221                                 continue;
2222                         if (publ->upper != seq->upper)
2223                                 break;
2224                         tipc_nametbl_withdraw(net, publ->type, publ->lower,
2225                                               publ->ref, publ->key);
2226                         rc = 0;
2227                         break;
2228                 }
2229                 tipc_nametbl_withdraw(net, publ->type, publ->lower,
2230                                       publ->ref, publ->key);
2231                 rc = 0;
2232         }
2233         if (list_empty(&tsk->publications))
2234                 tsk->published = 0;
2235         return rc;
2236 }
2237
2238 /* tipc_sk_reinit: set non-zero address in all existing sockets
2239  *                 when we go from standalone to network mode.
2240  */
2241 void tipc_sk_reinit(struct net *net)
2242 {
2243         struct tipc_net *tn = net_generic(net, tipc_net_id);
2244         struct rhashtable_iter iter;
2245         struct tipc_sock *tsk;
2246         struct tipc_msg *msg;
2247
2248         rhashtable_walk_enter(&tn->sk_rht, &iter);
2249
2250         do {
2251                 tsk = ERR_PTR(rhashtable_walk_start(&iter));
2252                 if (tsk)
2253                         continue;
2254
2255                 while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) {
2256                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2257                         msg = &tsk->phdr;
2258                         msg_set_prevnode(msg, tn->own_addr);
2259                         msg_set_orignode(msg, tn->own_addr);
2260                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2261                 }
2262
2263                 rhashtable_walk_stop(&iter);
2264         } while (tsk == ERR_PTR(-EAGAIN));
2265 }
2266
2267 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2268 {
2269         struct tipc_net *tn = net_generic(net, tipc_net_id);
2270         struct tipc_sock *tsk;
2271
2272         rcu_read_lock();
2273         tsk = rhashtable_lookup_fast(&tn->sk_rht, &portid, tsk_rht_params);
2274         if (tsk)
2275                 sock_hold(&tsk->sk);
2276         rcu_read_unlock();
2277
2278         return tsk;
2279 }
2280
2281 static int tipc_sk_insert(struct tipc_sock *tsk)
2282 {
2283         struct sock *sk = &tsk->sk;
2284         struct net *net = sock_net(sk);
2285         struct tipc_net *tn = net_generic(net, tipc_net_id);
2286         u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
2287         u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2288
2289         while (remaining--) {
2290                 portid++;
2291                 if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
2292                         portid = TIPC_MIN_PORT;
2293                 tsk->portid = portid;
2294                 sock_hold(&tsk->sk);
2295                 if (!rhashtable_lookup_insert_fast(&tn->sk_rht, &tsk->node,
2296                                                    tsk_rht_params))
2297                         return 0;
2298                 sock_put(&tsk->sk);
2299         }
2300
2301         return -1;
2302 }
2303
2304 static void tipc_sk_remove(struct tipc_sock *tsk)
2305 {
2306         struct sock *sk = &tsk->sk;
2307         struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
2308
2309         if (!rhashtable_remove_fast(&tn->sk_rht, &tsk->node, tsk_rht_params)) {
2310                 WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
2311                 __sock_put(sk);
2312         }
2313 }
2314
2315 static const struct rhashtable_params tsk_rht_params = {
2316         .nelem_hint = 192,
2317         .head_offset = offsetof(struct tipc_sock, node),
2318         .key_offset = offsetof(struct tipc_sock, portid),
2319         .key_len = sizeof(u32), /* portid */
2320         .max_size = 1048576,
2321         .min_size = 256,
2322         .automatic_shrinking = true,
2323 };
2324
2325 int tipc_sk_rht_init(struct net *net)
2326 {
2327         struct tipc_net *tn = net_generic(net, tipc_net_id);
2328
2329         return rhashtable_init(&tn->sk_rht, &tsk_rht_params);
2330 }
2331
2332 void tipc_sk_rht_destroy(struct net *net)
2333 {
2334         struct tipc_net *tn = net_generic(net, tipc_net_id);
2335
2336         /* Wait for socket readers to complete */
2337         synchronize_net();
2338
2339         rhashtable_destroy(&tn->sk_rht);
2340 }
2341
2342 /**
2343  * tipc_setsockopt - set socket option
2344  * @sock: socket structure
2345  * @lvl: option level
2346  * @opt: option identifier
2347  * @ov: pointer to new option value
2348  * @ol: length of option value
2349  *
2350  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
2351  * (to ease compatibility).
2352  *
2353  * Returns 0 on success, errno otherwise
2354  */
2355 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2356                            char __user *ov, unsigned int ol)
2357 {
2358         struct sock *sk = sock->sk;
2359         struct tipc_sock *tsk = tipc_sk(sk);
2360         u32 value = 0;
2361         int res = 0;
2362
2363         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2364                 return 0;
2365         if (lvl != SOL_TIPC)
2366                 return -ENOPROTOOPT;
2367
2368         switch (opt) {
2369         case TIPC_IMPORTANCE:
2370         case TIPC_SRC_DROPPABLE:
2371         case TIPC_DEST_DROPPABLE:
2372         case TIPC_CONN_TIMEOUT:
2373                 if (ol < sizeof(value))
2374                         return -EINVAL;
2375                 res = get_user(value, (u32 __user *)ov);
2376                 if (res)
2377                         return res;
2378                 break;
2379         default:
2380                 if (ov || ol)
2381                         return -EINVAL;
2382         }
2383
2384         lock_sock(sk);
2385
2386         switch (opt) {
2387         case TIPC_IMPORTANCE:
2388                 res = tsk_set_importance(tsk, value);
2389                 break;
2390         case TIPC_SRC_DROPPABLE:
2391                 if (sock->type != SOCK_STREAM)
2392                         tsk_set_unreliable(tsk, value);
2393                 else
2394                         res = -ENOPROTOOPT;
2395                 break;
2396         case TIPC_DEST_DROPPABLE:
2397                 tsk_set_unreturnable(tsk, value);
2398                 break;
2399         case TIPC_CONN_TIMEOUT:
2400                 tipc_sk(sk)->conn_timeout = value;
2401                 break;
2402         case TIPC_MCAST_BROADCAST:
2403                 tsk->mc_method.rcast = false;
2404                 tsk->mc_method.mandatory = true;
2405                 break;
2406         case TIPC_MCAST_REPLICAST:
2407                 tsk->mc_method.rcast = true;
2408                 tsk->mc_method.mandatory = true;
2409                 break;
2410         default:
2411                 res = -EINVAL;
2412         }
2413
2414         release_sock(sk);
2415
2416         return res;
2417 }
2418
2419 /**
2420  * tipc_getsockopt - get socket option
2421  * @sock: socket structure
2422  * @lvl: option level
2423  * @opt: option identifier
2424  * @ov: receptacle for option value
2425  * @ol: receptacle for length of option value
2426  *
2427  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
2428  * (to ease compatibility).
2429  *
2430  * Returns 0 on success, errno otherwise
2431  */
2432 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2433                            char __user *ov, int __user *ol)
2434 {
2435         struct sock *sk = sock->sk;
2436         struct tipc_sock *tsk = tipc_sk(sk);
2437         int len;
2438         u32 value;
2439         int res;
2440
2441         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2442                 return put_user(0, ol);
2443         if (lvl != SOL_TIPC)
2444                 return -ENOPROTOOPT;
2445         res = get_user(len, ol);
2446         if (res)
2447                 return res;
2448
2449         lock_sock(sk);
2450
2451         switch (opt) {
2452         case TIPC_IMPORTANCE:
2453                 value = tsk_importance(tsk);
2454                 break;
2455         case TIPC_SRC_DROPPABLE:
2456                 value = tsk_unreliable(tsk);
2457                 break;
2458         case TIPC_DEST_DROPPABLE:
2459                 value = tsk_unreturnable(tsk);
2460                 break;
2461         case TIPC_CONN_TIMEOUT:
2462                 value = tsk->conn_timeout;
2463                 /* no need to set "res", since already 0 at this point */
2464                 break;
2465         case TIPC_NODE_RECVQ_DEPTH:
2466                 value = 0; /* was tipc_queue_size, now obsolete */
2467                 break;
2468         case TIPC_SOCK_RECVQ_DEPTH:
2469                 value = skb_queue_len(&sk->sk_receive_queue);
2470                 break;
2471         default:
2472                 res = -EINVAL;
2473         }
2474
2475         release_sock(sk);
2476
2477         if (res)
2478                 return res;     /* "get" failed */
2479
2480         if (len < sizeof(value))
2481                 return -EINVAL;
2482
2483         if (copy_to_user(ov, &value, sizeof(value)))
2484                 return -EFAULT;
2485
2486         return put_user(sizeof(value), ol);
2487 }
2488
2489 static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
2490 {
2491         struct sock *sk = sock->sk;
2492         struct tipc_sioc_ln_req lnr;
2493         void __user *argp = (void __user *)arg;
2494
2495         switch (cmd) {
2496         case SIOCGETLINKNAME:
2497                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
2498                         return -EFAULT;
2499                 if (!tipc_node_get_linkname(sock_net(sk),
2500                                             lnr.bearer_id & 0xffff, lnr.peer,
2501                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
2502                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
2503                                 return -EFAULT;
2504                         return 0;
2505                 }
2506                 return -EADDRNOTAVAIL;
2507         default:
2508                 return -ENOIOCTLCMD;
2509         }
2510 }
2511
2512 /* Protocol switches for the various types of TIPC sockets */
2513
2514 static const struct proto_ops msg_ops = {
2515         .owner          = THIS_MODULE,
2516         .family         = AF_TIPC,
2517         .release        = tipc_release,
2518         .bind           = tipc_bind,
2519         .connect        = tipc_connect,
2520         .socketpair     = sock_no_socketpair,
2521         .accept         = sock_no_accept,
2522         .getname        = tipc_getname,
2523         .poll           = tipc_poll,
2524         .ioctl          = tipc_ioctl,
2525         .listen         = sock_no_listen,
2526         .shutdown       = tipc_shutdown,
2527         .setsockopt     = tipc_setsockopt,
2528         .getsockopt     = tipc_getsockopt,
2529         .sendmsg        = tipc_sendmsg,
2530         .recvmsg        = tipc_recvmsg,
2531         .mmap           = sock_no_mmap,
2532         .sendpage       = sock_no_sendpage
2533 };
2534
2535 static const struct proto_ops packet_ops = {
2536         .owner          = THIS_MODULE,
2537         .family         = AF_TIPC,
2538         .release        = tipc_release,
2539         .bind           = tipc_bind,
2540         .connect        = tipc_connect,
2541         .socketpair     = sock_no_socketpair,
2542         .accept         = tipc_accept,
2543         .getname        = tipc_getname,
2544         .poll           = tipc_poll,
2545         .ioctl          = tipc_ioctl,
2546         .listen         = tipc_listen,
2547         .shutdown       = tipc_shutdown,
2548         .setsockopt     = tipc_setsockopt,
2549         .getsockopt     = tipc_getsockopt,
2550         .sendmsg        = tipc_send_packet,
2551         .recvmsg        = tipc_recvmsg,
2552         .mmap           = sock_no_mmap,
2553         .sendpage       = sock_no_sendpage
2554 };
2555
2556 static const struct proto_ops stream_ops = {
2557         .owner          = THIS_MODULE,
2558         .family         = AF_TIPC,
2559         .release        = tipc_release,
2560         .bind           = tipc_bind,
2561         .connect        = tipc_connect,
2562         .socketpair     = sock_no_socketpair,
2563         .accept         = tipc_accept,
2564         .getname        = tipc_getname,
2565         .poll           = tipc_poll,
2566         .ioctl          = tipc_ioctl,
2567         .listen         = tipc_listen,
2568         .shutdown       = tipc_shutdown,
2569         .setsockopt     = tipc_setsockopt,
2570         .getsockopt     = tipc_getsockopt,
2571         .sendmsg        = tipc_sendstream,
2572         .recvmsg        = tipc_recv_stream,
2573         .mmap           = sock_no_mmap,
2574         .sendpage       = sock_no_sendpage
2575 };
2576
2577 static const struct net_proto_family tipc_family_ops = {
2578         .owner          = THIS_MODULE,
2579         .family         = AF_TIPC,
2580         .create         = tipc_sk_create
2581 };
2582
2583 static struct proto tipc_proto = {
2584         .name           = "TIPC",
2585         .owner          = THIS_MODULE,
2586         .obj_size       = sizeof(struct tipc_sock),
2587         .sysctl_rmem    = sysctl_tipc_rmem
2588 };
2589
2590 /**
2591  * tipc_socket_init - initialize TIPC socket interface
2592  *
2593  * Returns 0 on success, errno otherwise
2594  */
2595 int tipc_socket_init(void)
2596 {
2597         int res;
2598
2599         res = proto_register(&tipc_proto, 1);
2600         if (res) {
2601                 pr_err("Failed to register TIPC protocol type\n");
2602                 goto out;
2603         }
2604
2605         res = sock_register(&tipc_family_ops);
2606         if (res) {
2607                 pr_err("Failed to register TIPC socket type\n");
2608                 proto_unregister(&tipc_proto);
2609                 goto out;
2610         }
2611  out:
2612         return res;
2613 }
2614
2615 /**
2616  * tipc_socket_stop - stop TIPC socket interface
2617  */
2618 void tipc_socket_stop(void)
2619 {
2620         sock_unregister(tipc_family_ops.family);
2621         proto_unregister(&tipc_proto);
2622 }
2623
2624 /* Caller should hold socket lock for the passed tipc socket. */
2625 static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
2626 {
2627         u32 peer_node;
2628         u32 peer_port;
2629         struct nlattr *nest;
2630
2631         peer_node = tsk_peer_node(tsk);
2632         peer_port = tsk_peer_port(tsk);
2633
2634         nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
2635
2636         if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
2637                 goto msg_full;
2638         if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
2639                 goto msg_full;
2640
2641         if (tsk->conn_type != 0) {
2642                 if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
2643                         goto msg_full;
2644                 if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
2645                         goto msg_full;
2646                 if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
2647                         goto msg_full;
2648         }
2649         nla_nest_end(skb, nest);
2650
2651         return 0;
2652
2653 msg_full:
2654         nla_nest_cancel(skb, nest);
2655
2656         return -EMSGSIZE;
2657 }
2658
2659 /* Caller should hold socket lock for the passed tipc socket. */
2660 static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
2661                             struct tipc_sock *tsk)
2662 {
2663         int err;
2664         void *hdr;
2665         struct nlattr *attrs;
2666         struct net *net = sock_net(skb->sk);
2667         struct tipc_net *tn = net_generic(net, tipc_net_id);
2668         struct sock *sk = &tsk->sk;
2669
2670         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2671                           &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2672         if (!hdr)
2673                 goto msg_cancel;
2674
2675         attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
2676         if (!attrs)
2677                 goto genlmsg_cancel;
2678         if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
2679                 goto attr_msg_cancel;
2680         if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tn->own_addr))
2681                 goto attr_msg_cancel;
2682
2683         if (tipc_sk_connected(sk)) {
2684                 err = __tipc_nl_add_sk_con(skb, tsk);
2685                 if (err)
2686                         goto attr_msg_cancel;
2687         } else if (!list_empty(&tsk->publications)) {
2688                 if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
2689                         goto attr_msg_cancel;
2690         }
2691         nla_nest_end(skb, attrs);
2692         genlmsg_end(skb, hdr);
2693
2694         return 0;
2695
2696 attr_msg_cancel:
2697         nla_nest_cancel(skb, attrs);
2698 genlmsg_cancel:
2699         genlmsg_cancel(skb, hdr);
2700 msg_cancel:
2701         return -EMSGSIZE;
2702 }
2703
2704 int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
2705 {
2706         int err;
2707         struct tipc_sock *tsk;
2708         const struct bucket_table *tbl;
2709         struct rhash_head *pos;
2710         struct net *net = sock_net(skb->sk);
2711         struct tipc_net *tn = net_generic(net, tipc_net_id);
2712         u32 tbl_id = cb->args[0];
2713         u32 prev_portid = cb->args[1];
2714
2715         rcu_read_lock();
2716         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2717         for (; tbl_id < tbl->size; tbl_id++) {
2718                 rht_for_each_entry_rcu(tsk, pos, tbl, tbl_id, node) {
2719                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2720                         if (prev_portid && prev_portid != tsk->portid) {
2721                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2722                                 continue;
2723                         }
2724
2725                         err = __tipc_nl_add_sk(skb, cb, tsk);
2726                         if (err) {
2727                                 prev_portid = tsk->portid;
2728                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2729                                 goto out;
2730                         }
2731                         prev_portid = 0;
2732                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2733                 }
2734         }
2735 out:
2736         rcu_read_unlock();
2737         cb->args[0] = tbl_id;
2738         cb->args[1] = prev_portid;
2739
2740         return skb->len;
2741 }
2742
2743 /* Caller should hold socket lock for the passed tipc socket. */
2744 static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
2745                                  struct netlink_callback *cb,
2746                                  struct publication *publ)
2747 {
2748         void *hdr;
2749         struct nlattr *attrs;
2750
2751         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2752                           &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2753         if (!hdr)
2754                 goto msg_cancel;
2755
2756         attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
2757         if (!attrs)
2758                 goto genlmsg_cancel;
2759
2760         if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
2761                 goto attr_msg_cancel;
2762         if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
2763                 goto attr_msg_cancel;
2764         if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
2765                 goto attr_msg_cancel;
2766         if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
2767                 goto attr_msg_cancel;
2768
2769         nla_nest_end(skb, attrs);
2770         genlmsg_end(skb, hdr);
2771
2772         return 0;
2773
2774 attr_msg_cancel:
2775         nla_nest_cancel(skb, attrs);
2776 genlmsg_cancel:
2777         genlmsg_cancel(skb, hdr);
2778 msg_cancel:
2779         return -EMSGSIZE;
2780 }
2781
2782 /* Caller should hold socket lock for the passed tipc socket. */
2783 static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
2784                                   struct netlink_callback *cb,
2785                                   struct tipc_sock *tsk, u32 *last_publ)
2786 {
2787         int err;
2788         struct publication *p;
2789
2790         if (*last_publ) {
2791                 list_for_each_entry(p, &tsk->publications, pport_list) {
2792                         if (p->key == *last_publ)
2793                                 break;
2794                 }
2795                 if (p->key != *last_publ) {
2796                         /* We never set seq or call nl_dump_check_consistent()
2797                          * this means that setting prev_seq here will cause the
2798                          * consistence check to fail in the netlink callback
2799                          * handler. Resulting in the last NLMSG_DONE message
2800                          * having the NLM_F_DUMP_INTR flag set.
2801                          */
2802                         cb->prev_seq = 1;
2803                         *last_publ = 0;
2804                         return -EPIPE;
2805                 }
2806         } else {
2807                 p = list_first_entry(&tsk->publications, struct publication,
2808                                      pport_list);
2809         }
2810
2811         list_for_each_entry_from(p, &tsk->publications, pport_list) {
2812                 err = __tipc_nl_add_sk_publ(skb, cb, p);
2813                 if (err) {
2814                         *last_publ = p->key;
2815                         return err;
2816                 }
2817         }
2818         *last_publ = 0;
2819
2820         return 0;
2821 }
2822
2823 int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2824 {
2825         int err;
2826         u32 tsk_portid = cb->args[0];
2827         u32 last_publ = cb->args[1];
2828         u32 done = cb->args[2];
2829         struct net *net = sock_net(skb->sk);
2830         struct tipc_sock *tsk;
2831
2832         if (!tsk_portid) {
2833                 struct nlattr **attrs;
2834                 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
2835
2836                 err = tipc_nlmsg_parse(cb->nlh, &attrs);
2837                 if (err)
2838                         return err;
2839
2840                 if (!attrs[TIPC_NLA_SOCK])
2841                         return -EINVAL;
2842
2843                 err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
2844                                        attrs[TIPC_NLA_SOCK],
2845                                        tipc_nl_sock_policy);
2846                 if (err)
2847                         return err;
2848
2849                 if (!sock[TIPC_NLA_SOCK_REF])
2850                         return -EINVAL;
2851
2852                 tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2853         }
2854
2855         if (done)
2856                 return 0;
2857
2858         tsk = tipc_sk_lookup(net, tsk_portid);
2859         if (!tsk)
2860                 return -EINVAL;
2861
2862         lock_sock(&tsk->sk);
2863         err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
2864         if (!err)
2865                 done = 1;
2866         release_sock(&tsk->sk);
2867         sock_put(&tsk->sk);
2868
2869         cb->args[0] = tsk_portid;
2870         cb->args[1] = last_publ;
2871         cb->args[2] = done;
2872
2873         return skb->len;
2874 }