]> git.karo-electronics.de Git - karo-tx-linux.git/blob - net/tipc/socket.c
tipc: wake up all waiting threads at socket shutdown
[karo-tx-linux.git] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2011, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "port.h"
39
40 #include <linux/export.h>
41 #include <net/sock.h>
42
43 #define SS_LISTENING    -1      /* socket is listening */
44 #define SS_READY        -2      /* socket is connectionless */
45
46 #define OVERLOAD_LIMIT_BASE     5000
47 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
48
49 struct tipc_sock {
50         struct sock sk;
51         struct tipc_port *p;
52         struct tipc_portid peer_name;
53         unsigned int conn_timeout;
54 };
55
56 #define tipc_sk(sk) ((struct tipc_sock *)(sk))
57 #define tipc_sk_port(sk) (tipc_sk(sk)->p)
58
59 #define tipc_rx_ready(sock) (!skb_queue_empty(&sock->sk->sk_receive_queue) || \
60                         (sock->state == SS_DISCONNECTING))
61
62 static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
63 static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
64 static void wakeupdispatch(struct tipc_port *tport);
65 static void tipc_data_ready(struct sock *sk, int len);
66 static void tipc_write_space(struct sock *sk);
67
68 static const struct proto_ops packet_ops;
69 static const struct proto_ops stream_ops;
70 static const struct proto_ops msg_ops;
71
72 static struct proto tipc_proto;
73
74 static int sockets_enabled;
75
76 static atomic_t tipc_queue_size = ATOMIC_INIT(0);
77
78 /*
79  * Revised TIPC socket locking policy:
80  *
81  * Most socket operations take the standard socket lock when they start
82  * and hold it until they finish (or until they need to sleep).  Acquiring
83  * this lock grants the owner exclusive access to the fields of the socket
84  * data structures, with the exception of the backlog queue.  A few socket
85  * operations can be done without taking the socket lock because they only
86  * read socket information that never changes during the life of the socket.
87  *
88  * Socket operations may acquire the lock for the associated TIPC port if they
89  * need to perform an operation on the port.  If any routine needs to acquire
90  * both the socket lock and the port lock it must take the socket lock first
91  * to avoid the risk of deadlock.
92  *
93  * The dispatcher handling incoming messages cannot grab the socket lock in
94  * the standard fashion, since invoked it runs at the BH level and cannot block.
95  * Instead, it checks to see if the socket lock is currently owned by someone,
96  * and either handles the message itself or adds it to the socket's backlog
97  * queue; in the latter case the queued message is processed once the process
98  * owning the socket lock releases it.
99  *
100  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
101  * the problem of a blocked socket operation preventing any other operations
102  * from occurring.  However, applications must be careful if they have
103  * multiple threads trying to send (or receive) on the same socket, as these
104  * operations might interfere with each other.  For example, doing a connect
105  * and a receive at the same time might allow the receive to consume the
106  * ACK message meant for the connect.  While additional work could be done
107  * to try and overcome this, it doesn't seem to be worthwhile at the present.
108  *
109  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
110  * that another operation that must be performed in a non-blocking manner is
111  * not delayed for very long because the lock has already been taken.
112  *
113  * NOTE: This code assumes that certain fields of a port/socket pair are
114  * constant over its lifetime; such fields can be examined without taking
115  * the socket lock and/or port lock, and do not need to be re-read even
116  * after resuming processing after waiting.  These fields include:
117  *   - socket type
118  *   - pointer to socket sk structure (aka tipc_sock structure)
119  *   - pointer to port structure
120  *   - port reference
121  */
122
123 /**
124  * advance_rx_queue - discard first buffer in socket receive queue
125  *
126  * Caller must hold socket lock
127  */
128 static void advance_rx_queue(struct sock *sk)
129 {
130         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
131         atomic_dec(&tipc_queue_size);
132 }
133
134 /**
135  * discard_rx_queue - discard all buffers in socket receive queue
136  *
137  * Caller must hold socket lock
138  */
139 static void discard_rx_queue(struct sock *sk)
140 {
141         struct sk_buff *buf;
142
143         while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
144                 atomic_dec(&tipc_queue_size);
145                 kfree_skb(buf);
146         }
147 }
148
149 /**
150  * reject_rx_queue - reject all buffers in socket receive queue
151  *
152  * Caller must hold socket lock
153  */
154 static void reject_rx_queue(struct sock *sk)
155 {
156         struct sk_buff *buf;
157
158         while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
159                 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
160                 atomic_dec(&tipc_queue_size);
161         }
162 }
163
164 /**
165  * tipc_create - create a TIPC socket
166  * @net: network namespace (must be default network)
167  * @sock: pre-allocated socket structure
168  * @protocol: protocol indicator (must be 0)
169  * @kern: caused by kernel or by userspace?
170  *
171  * This routine creates additional data structures used by the TIPC socket,
172  * initializes them, and links them together.
173  *
174  * Returns 0 on success, errno otherwise
175  */
176 static int tipc_create(struct net *net, struct socket *sock, int protocol,
177                        int kern)
178 {
179         const struct proto_ops *ops;
180         socket_state state;
181         struct sock *sk;
182         struct tipc_port *tp_ptr;
183
184         /* Validate arguments */
185         if (unlikely(protocol != 0))
186                 return -EPROTONOSUPPORT;
187
188         switch (sock->type) {
189         case SOCK_STREAM:
190                 ops = &stream_ops;
191                 state = SS_UNCONNECTED;
192                 break;
193         case SOCK_SEQPACKET:
194                 ops = &packet_ops;
195                 state = SS_UNCONNECTED;
196                 break;
197         case SOCK_DGRAM:
198         case SOCK_RDM:
199                 ops = &msg_ops;
200                 state = SS_READY;
201                 break;
202         default:
203                 return -EPROTOTYPE;
204         }
205
206         /* Allocate socket's protocol area */
207         sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
208         if (sk == NULL)
209                 return -ENOMEM;
210
211         /* Allocate TIPC port for socket to use */
212         tp_ptr = tipc_createport_raw(sk, &dispatch, &wakeupdispatch,
213                                      TIPC_LOW_IMPORTANCE);
214         if (unlikely(!tp_ptr)) {
215                 sk_free(sk);
216                 return -ENOMEM;
217         }
218
219         /* Finish initializing socket data structures */
220         sock->ops = ops;
221         sock->state = state;
222
223         sock_init_data(sock, sk);
224         sk->sk_backlog_rcv = backlog_rcv;
225         sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2;
226         sk->sk_data_ready = tipc_data_ready;
227         sk->sk_write_space = tipc_write_space;
228         tipc_sk(sk)->p = tp_ptr;
229         tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
230
231         spin_unlock_bh(tp_ptr->lock);
232
233         if (sock->state == SS_READY) {
234                 tipc_set_portunreturnable(tp_ptr->ref, 1);
235                 if (sock->type == SOCK_DGRAM)
236                         tipc_set_portunreliable(tp_ptr->ref, 1);
237         }
238
239         return 0;
240 }
241
242 /**
243  * release - destroy a TIPC socket
244  * @sock: socket to destroy
245  *
246  * This routine cleans up any messages that are still queued on the socket.
247  * For DGRAM and RDM socket types, all queued messages are rejected.
248  * For SEQPACKET and STREAM socket types, the first message is rejected
249  * and any others are discarded.  (If the first message on a STREAM socket
250  * is partially-read, it is discarded and the next one is rejected instead.)
251  *
252  * NOTE: Rejected messages are not necessarily returned to the sender!  They
253  * are returned or discarded according to the "destination droppable" setting
254  * specified for the message by the sender.
255  *
256  * Returns 0 on success, errno otherwise
257  */
258 static int release(struct socket *sock)
259 {
260         struct sock *sk = sock->sk;
261         struct tipc_port *tport;
262         struct sk_buff *buf;
263         int res;
264
265         /*
266          * Exit if socket isn't fully initialized (occurs when a failed accept()
267          * releases a pre-allocated child socket that was never used)
268          */
269         if (sk == NULL)
270                 return 0;
271
272         tport = tipc_sk_port(sk);
273         lock_sock(sk);
274
275         /*
276          * Reject all unreceived messages, except on an active connection
277          * (which disconnects locally & sends a 'FIN+' to peer)
278          */
279         while (sock->state != SS_DISCONNECTING) {
280                 buf = __skb_dequeue(&sk->sk_receive_queue);
281                 if (buf == NULL)
282                         break;
283                 atomic_dec(&tipc_queue_size);
284                 if (TIPC_SKB_CB(buf)->handle != 0)
285                         kfree_skb(buf);
286                 else {
287                         if ((sock->state == SS_CONNECTING) ||
288                             (sock->state == SS_CONNECTED)) {
289                                 sock->state = SS_DISCONNECTING;
290                                 tipc_disconnect(tport->ref);
291                         }
292                         tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
293                 }
294         }
295
296         /*
297          * Delete TIPC port; this ensures no more messages are queued
298          * (also disconnects an active connection & sends a 'FIN-' to peer)
299          */
300         res = tipc_deleteport(tport->ref);
301
302         /* Discard any remaining (connection-based) messages in receive queue */
303         discard_rx_queue(sk);
304
305         /* Reject any messages that accumulated in backlog queue */
306         sock->state = SS_DISCONNECTING;
307         release_sock(sk);
308
309         sock_put(sk);
310         sock->sk = NULL;
311
312         return res;
313 }
314
315 /**
316  * bind - associate or disassocate TIPC name(s) with a socket
317  * @sock: socket structure
318  * @uaddr: socket address describing name(s) and desired operation
319  * @uaddr_len: size of socket address data structure
320  *
321  * Name and name sequence binding is indicated using a positive scope value;
322  * a negative scope value unbinds the specified name.  Specifying no name
323  * (i.e. a socket address length of 0) unbinds all names from the socket.
324  *
325  * Returns 0 on success, errno otherwise
326  *
327  * NOTE: This routine doesn't need to take the socket lock since it doesn't
328  *       access any non-constant socket information.
329  */
330 static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
331 {
332         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
333         u32 portref = tipc_sk_port(sock->sk)->ref;
334
335         if (unlikely(!uaddr_len))
336                 return tipc_withdraw(portref, 0, NULL);
337
338         if (uaddr_len < sizeof(struct sockaddr_tipc))
339                 return -EINVAL;
340         if (addr->family != AF_TIPC)
341                 return -EAFNOSUPPORT;
342
343         if (addr->addrtype == TIPC_ADDR_NAME)
344                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
345         else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
346                 return -EAFNOSUPPORT;
347
348         if (addr->addr.nameseq.type < TIPC_RESERVED_TYPES)
349                 return -EACCES;
350
351         return (addr->scope > 0) ?
352                 tipc_publish(portref, addr->scope, &addr->addr.nameseq) :
353                 tipc_withdraw(portref, -addr->scope, &addr->addr.nameseq);
354 }
355
356 /**
357  * get_name - get port ID of socket or peer socket
358  * @sock: socket structure
359  * @uaddr: area for returned socket address
360  * @uaddr_len: area for returned length of socket address
361  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
362  *
363  * Returns 0 on success, errno otherwise
364  *
365  * NOTE: This routine doesn't need to take the socket lock since it only
366  *       accesses socket information that is unchanging (or which changes in
367  *       a completely predictable manner).
368  */
369 static int get_name(struct socket *sock, struct sockaddr *uaddr,
370                     int *uaddr_len, int peer)
371 {
372         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
373         struct tipc_sock *tsock = tipc_sk(sock->sk);
374
375         memset(addr, 0, sizeof(*addr));
376         if (peer) {
377                 if ((sock->state != SS_CONNECTED) &&
378                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
379                         return -ENOTCONN;
380                 addr->addr.id.ref = tsock->peer_name.ref;
381                 addr->addr.id.node = tsock->peer_name.node;
382         } else {
383                 addr->addr.id.ref = tsock->p->ref;
384                 addr->addr.id.node = tipc_own_addr;
385         }
386
387         *uaddr_len = sizeof(*addr);
388         addr->addrtype = TIPC_ADDR_ID;
389         addr->family = AF_TIPC;
390         addr->scope = 0;
391         addr->addr.name.domain = 0;
392
393         return 0;
394 }
395
396 /**
397  * poll - read and possibly block on pollmask
398  * @file: file structure associated with the socket
399  * @sock: socket for which to calculate the poll bits
400  * @wait: ???
401  *
402  * Returns pollmask value
403  *
404  * COMMENTARY:
405  * It appears that the usual socket locking mechanisms are not useful here
406  * since the pollmask info is potentially out-of-date the moment this routine
407  * exits.  TCP and other protocols seem to rely on higher level poll routines
408  * to handle any preventable race conditions, so TIPC will do the same ...
409  *
410  * TIPC sets the returned events as follows:
411  *
412  * socket state         flags set
413  * ------------         ---------
414  * unconnected          no read flags
415  *                      POLLOUT if port is not congested
416  *
417  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
418  *                      no write flags
419  *
420  * connected            POLLIN/POLLRDNORM if data in rx queue
421  *                      POLLOUT if port is not congested
422  *
423  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
424  *                      no write flags
425  *
426  * listening            POLLIN if SYN in rx queue
427  *                      no write flags
428  *
429  * ready                POLLIN/POLLRDNORM if data in rx queue
430  * [connectionless]     POLLOUT (since port cannot be congested)
431  *
432  * IMPORTANT: The fact that a read or write operation is indicated does NOT
433  * imply that the operation will succeed, merely that it should be performed
434  * and will not block.
435  */
436 static unsigned int poll(struct file *file, struct socket *sock,
437                          poll_table *wait)
438 {
439         struct sock *sk = sock->sk;
440         u32 mask = 0;
441
442         sock_poll_wait(file, sk_sleep(sk), wait);
443
444         switch ((int)sock->state) {
445         case SS_UNCONNECTED:
446                 if (!tipc_sk_port(sk)->congested)
447                         mask |= POLLOUT;
448                 break;
449         case SS_READY:
450         case SS_CONNECTED:
451                 if (!tipc_sk_port(sk)->congested)
452                         mask |= POLLOUT;
453                 /* fall thru' */
454         case SS_CONNECTING:
455         case SS_LISTENING:
456                 if (!skb_queue_empty(&sk->sk_receive_queue))
457                         mask |= (POLLIN | POLLRDNORM);
458                 break;
459         case SS_DISCONNECTING:
460                 mask = (POLLIN | POLLRDNORM | POLLHUP);
461                 break;
462         }
463
464         return mask;
465 }
466
467 /**
468  * dest_name_check - verify user is permitted to send to specified port name
469  * @dest: destination address
470  * @m: descriptor for message to be sent
471  *
472  * Prevents restricted configuration commands from being issued by
473  * unauthorized users.
474  *
475  * Returns 0 if permission is granted, otherwise errno
476  */
477 static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
478 {
479         struct tipc_cfg_msg_hdr hdr;
480
481         if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
482                 return 0;
483         if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
484                 return 0;
485         if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
486                 return -EACCES;
487
488         if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
489                 return -EMSGSIZE;
490         if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
491                 return -EFAULT;
492         if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
493                 return -EACCES;
494
495         return 0;
496 }
497
498 /**
499  * send_msg - send message in connectionless manner
500  * @iocb: if NULL, indicates that socket lock is already held
501  * @sock: socket structure
502  * @m: message to send
503  * @total_len: length of message
504  *
505  * Message must have an destination specified explicitly.
506  * Used for SOCK_RDM and SOCK_DGRAM messages,
507  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
508  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
509  *
510  * Returns the number of bytes sent on success, or errno otherwise
511  */
512 static int send_msg(struct kiocb *iocb, struct socket *sock,
513                     struct msghdr *m, size_t total_len)
514 {
515         struct sock *sk = sock->sk;
516         struct tipc_port *tport = tipc_sk_port(sk);
517         struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
518         int needs_conn;
519         long timeout_val;
520         int res = -EINVAL;
521
522         if (unlikely(!dest))
523                 return -EDESTADDRREQ;
524         if (unlikely((m->msg_namelen < sizeof(*dest)) ||
525                      (dest->family != AF_TIPC)))
526                 return -EINVAL;
527         if ((total_len > TIPC_MAX_USER_MSG_SIZE) ||
528             (m->msg_iovlen > (unsigned int)INT_MAX))
529                 return -EMSGSIZE;
530
531         if (iocb)
532                 lock_sock(sk);
533
534         needs_conn = (sock->state != SS_READY);
535         if (unlikely(needs_conn)) {
536                 if (sock->state == SS_LISTENING) {
537                         res = -EPIPE;
538                         goto exit;
539                 }
540                 if (sock->state != SS_UNCONNECTED) {
541                         res = -EISCONN;
542                         goto exit;
543                 }
544                 if ((tport->published) ||
545                     ((sock->type == SOCK_STREAM) && (total_len != 0))) {
546                         res = -EOPNOTSUPP;
547                         goto exit;
548                 }
549                 if (dest->addrtype == TIPC_ADDR_NAME) {
550                         tport->conn_type = dest->addr.name.name.type;
551                         tport->conn_instance = dest->addr.name.name.instance;
552                 }
553
554                 /* Abort any pending connection attempts (very unlikely) */
555                 reject_rx_queue(sk);
556         }
557
558         timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
559
560         do {
561                 if (dest->addrtype == TIPC_ADDR_NAME) {
562                         res = dest_name_check(dest, m);
563                         if (res)
564                                 break;
565                         res = tipc_send2name(tport->ref,
566                                              &dest->addr.name.name,
567                                              dest->addr.name.domain,
568                                              m->msg_iovlen,
569                                              m->msg_iov,
570                                              total_len);
571                 } else if (dest->addrtype == TIPC_ADDR_ID) {
572                         res = tipc_send2port(tport->ref,
573                                              &dest->addr.id,
574                                              m->msg_iovlen,
575                                              m->msg_iov,
576                                              total_len);
577                 } else if (dest->addrtype == TIPC_ADDR_MCAST) {
578                         if (needs_conn) {
579                                 res = -EOPNOTSUPP;
580                                 break;
581                         }
582                         res = dest_name_check(dest, m);
583                         if (res)
584                                 break;
585                         res = tipc_multicast(tport->ref,
586                                              &dest->addr.nameseq,
587                                              m->msg_iovlen,
588                                              m->msg_iov,
589                                              total_len);
590                 }
591                 if (likely(res != -ELINKCONG)) {
592                         if (needs_conn && (res >= 0))
593                                 sock->state = SS_CONNECTING;
594                         break;
595                 }
596                 if (timeout_val <= 0L) {
597                         res = timeout_val ? timeout_val : -EWOULDBLOCK;
598                         break;
599                 }
600                 release_sock(sk);
601                 timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk),
602                                                !tport->congested, timeout_val);
603                 lock_sock(sk);
604         } while (1);
605
606 exit:
607         if (iocb)
608                 release_sock(sk);
609         return res;
610 }
611
612 /**
613  * send_packet - send a connection-oriented message
614  * @iocb: if NULL, indicates that socket lock is already held
615  * @sock: socket structure
616  * @m: message to send
617  * @total_len: length of message
618  *
619  * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
620  *
621  * Returns the number of bytes sent on success, or errno otherwise
622  */
623 static int send_packet(struct kiocb *iocb, struct socket *sock,
624                        struct msghdr *m, size_t total_len)
625 {
626         struct sock *sk = sock->sk;
627         struct tipc_port *tport = tipc_sk_port(sk);
628         struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
629         long timeout_val;
630         int res;
631
632         /* Handle implied connection establishment */
633         if (unlikely(dest))
634                 return send_msg(iocb, sock, m, total_len);
635
636         if ((total_len > TIPC_MAX_USER_MSG_SIZE) ||
637             (m->msg_iovlen > (unsigned int)INT_MAX))
638                 return -EMSGSIZE;
639
640         if (iocb)
641                 lock_sock(sk);
642
643         timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
644
645         do {
646                 if (unlikely(sock->state != SS_CONNECTED)) {
647                         if (sock->state == SS_DISCONNECTING)
648                                 res = -EPIPE;
649                         else
650                                 res = -ENOTCONN;
651                         break;
652                 }
653
654                 res = tipc_send(tport->ref, m->msg_iovlen, m->msg_iov,
655                                 total_len);
656                 if (likely(res != -ELINKCONG))
657                         break;
658                 if (timeout_val <= 0L) {
659                         res = timeout_val ? timeout_val : -EWOULDBLOCK;
660                         break;
661                 }
662                 release_sock(sk);
663                 timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk),
664                         (!tport->congested || !tport->connected), timeout_val);
665                 lock_sock(sk);
666         } while (1);
667
668         if (iocb)
669                 release_sock(sk);
670         return res;
671 }
672
673 /**
674  * send_stream - send stream-oriented data
675  * @iocb: (unused)
676  * @sock: socket structure
677  * @m: data to send
678  * @total_len: total length of data to be sent
679  *
680  * Used for SOCK_STREAM data.
681  *
682  * Returns the number of bytes sent on success (or partial success),
683  * or errno if no data sent
684  */
685 static int send_stream(struct kiocb *iocb, struct socket *sock,
686                        struct msghdr *m, size_t total_len)
687 {
688         struct sock *sk = sock->sk;
689         struct tipc_port *tport = tipc_sk_port(sk);
690         struct msghdr my_msg;
691         struct iovec my_iov;
692         struct iovec *curr_iov;
693         int curr_iovlen;
694         char __user *curr_start;
695         u32 hdr_size;
696         int curr_left;
697         int bytes_to_send;
698         int bytes_sent;
699         int res;
700
701         lock_sock(sk);
702
703         /* Handle special cases where there is no connection */
704         if (unlikely(sock->state != SS_CONNECTED)) {
705                 if (sock->state == SS_UNCONNECTED) {
706                         res = send_packet(NULL, sock, m, total_len);
707                         goto exit;
708                 } else if (sock->state == SS_DISCONNECTING) {
709                         res = -EPIPE;
710                         goto exit;
711                 } else {
712                         res = -ENOTCONN;
713                         goto exit;
714                 }
715         }
716
717         if (unlikely(m->msg_name)) {
718                 res = -EISCONN;
719                 goto exit;
720         }
721
722         if ((total_len > (unsigned int)INT_MAX) ||
723             (m->msg_iovlen > (unsigned int)INT_MAX)) {
724                 res = -EMSGSIZE;
725                 goto exit;
726         }
727
728         /*
729          * Send each iovec entry using one or more messages
730          *
731          * Note: This algorithm is good for the most likely case
732          * (i.e. one large iovec entry), but could be improved to pass sets
733          * of small iovec entries into send_packet().
734          */
735         curr_iov = m->msg_iov;
736         curr_iovlen = m->msg_iovlen;
737         my_msg.msg_iov = &my_iov;
738         my_msg.msg_iovlen = 1;
739         my_msg.msg_flags = m->msg_flags;
740         my_msg.msg_name = NULL;
741         bytes_sent = 0;
742
743         hdr_size = msg_hdr_sz(&tport->phdr);
744
745         while (curr_iovlen--) {
746                 curr_start = curr_iov->iov_base;
747                 curr_left = curr_iov->iov_len;
748
749                 while (curr_left) {
750                         bytes_to_send = tport->max_pkt - hdr_size;
751                         if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
752                                 bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
753                         if (curr_left < bytes_to_send)
754                                 bytes_to_send = curr_left;
755                         my_iov.iov_base = curr_start;
756                         my_iov.iov_len = bytes_to_send;
757                         res = send_packet(NULL, sock, &my_msg, bytes_to_send);
758                         if (res < 0) {
759                                 if (bytes_sent)
760                                         res = bytes_sent;
761                                 goto exit;
762                         }
763                         curr_left -= bytes_to_send;
764                         curr_start += bytes_to_send;
765                         bytes_sent += bytes_to_send;
766                 }
767
768                 curr_iov++;
769         }
770         res = bytes_sent;
771 exit:
772         release_sock(sk);
773         return res;
774 }
775
776 /**
777  * auto_connect - complete connection setup to a remote port
778  * @sock: socket structure
779  * @msg: peer's response message
780  *
781  * Returns 0 on success, errno otherwise
782  */
783 static int auto_connect(struct socket *sock, struct tipc_msg *msg)
784 {
785         struct tipc_sock *tsock = tipc_sk(sock->sk);
786
787         if (msg_errcode(msg)) {
788                 sock->state = SS_DISCONNECTING;
789                 return -ECONNREFUSED;
790         }
791
792         tsock->peer_name.ref = msg_origport(msg);
793         tsock->peer_name.node = msg_orignode(msg);
794         tipc_connect2port(tsock->p->ref, &tsock->peer_name);
795         tipc_set_portimportance(tsock->p->ref, msg_importance(msg));
796         sock->state = SS_CONNECTED;
797         return 0;
798 }
799
800 /**
801  * set_orig_addr - capture sender's address for received message
802  * @m: descriptor for message info
803  * @msg: received message header
804  *
805  * Note: Address is not captured if not requested by receiver.
806  */
807 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
808 {
809         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)m->msg_name;
810
811         if (addr) {
812                 addr->family = AF_TIPC;
813                 addr->addrtype = TIPC_ADDR_ID;
814                 addr->addr.id.ref = msg_origport(msg);
815                 addr->addr.id.node = msg_orignode(msg);
816                 addr->addr.name.domain = 0;     /* could leave uninitialized */
817                 addr->scope = 0;                /* could leave uninitialized */
818                 m->msg_namelen = sizeof(struct sockaddr_tipc);
819         }
820 }
821
822 /**
823  * anc_data_recv - optionally capture ancillary data for received message
824  * @m: descriptor for message info
825  * @msg: received message header
826  * @tport: TIPC port associated with message
827  *
828  * Note: Ancillary data is not captured if not requested by receiver.
829  *
830  * Returns 0 if successful, otherwise errno
831  */
832 static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
833                                 struct tipc_port *tport)
834 {
835         u32 anc_data[3];
836         u32 err;
837         u32 dest_type;
838         int has_name;
839         int res;
840
841         if (likely(m->msg_controllen == 0))
842                 return 0;
843
844         /* Optionally capture errored message object(s) */
845         err = msg ? msg_errcode(msg) : 0;
846         if (unlikely(err)) {
847                 anc_data[0] = err;
848                 anc_data[1] = msg_data_sz(msg);
849                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
850                 if (res)
851                         return res;
852                 if (anc_data[1]) {
853                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
854                                        msg_data(msg));
855                         if (res)
856                                 return res;
857                 }
858         }
859
860         /* Optionally capture message destination object */
861         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
862         switch (dest_type) {
863         case TIPC_NAMED_MSG:
864                 has_name = 1;
865                 anc_data[0] = msg_nametype(msg);
866                 anc_data[1] = msg_namelower(msg);
867                 anc_data[2] = msg_namelower(msg);
868                 break;
869         case TIPC_MCAST_MSG:
870                 has_name = 1;
871                 anc_data[0] = msg_nametype(msg);
872                 anc_data[1] = msg_namelower(msg);
873                 anc_data[2] = msg_nameupper(msg);
874                 break;
875         case TIPC_CONN_MSG:
876                 has_name = (tport->conn_type != 0);
877                 anc_data[0] = tport->conn_type;
878                 anc_data[1] = tport->conn_instance;
879                 anc_data[2] = tport->conn_instance;
880                 break;
881         default:
882                 has_name = 0;
883         }
884         if (has_name) {
885                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
886                 if (res)
887                         return res;
888         }
889
890         return 0;
891 }
892
893 /**
894  * recv_msg - receive packet-oriented message
895  * @iocb: (unused)
896  * @m: descriptor for message info
897  * @buf_len: total size of user buffer area
898  * @flags: receive flags
899  *
900  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
901  * If the complete message doesn't fit in user area, truncate it.
902  *
903  * Returns size of returned message data, errno otherwise
904  */
905 static int recv_msg(struct kiocb *iocb, struct socket *sock,
906                     struct msghdr *m, size_t buf_len, int flags)
907 {
908         struct sock *sk = sock->sk;
909         struct tipc_port *tport = tipc_sk_port(sk);
910         struct sk_buff *buf;
911         struct tipc_msg *msg;
912         long timeout;
913         unsigned int sz;
914         u32 err;
915         int res;
916
917         /* Catch invalid receive requests */
918         if (unlikely(!buf_len))
919                 return -EINVAL;
920
921         lock_sock(sk);
922
923         if (unlikely(sock->state == SS_UNCONNECTED)) {
924                 res = -ENOTCONN;
925                 goto exit;
926         }
927
928         timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
929 restart:
930
931         /* Look for a message in receive queue; wait if necessary */
932         while (skb_queue_empty(&sk->sk_receive_queue)) {
933                 if (sock->state == SS_DISCONNECTING) {
934                         res = -ENOTCONN;
935                         goto exit;
936                 }
937                 if (timeout <= 0L) {
938                         res = timeout ? timeout : -EWOULDBLOCK;
939                         goto exit;
940                 }
941                 release_sock(sk);
942                 timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
943                                                            tipc_rx_ready(sock),
944                                                            timeout);
945                 lock_sock(sk);
946         }
947
948         /* Look at first message in receive queue */
949         buf = skb_peek(&sk->sk_receive_queue);
950         msg = buf_msg(buf);
951         sz = msg_data_sz(msg);
952         err = msg_errcode(msg);
953
954         /* Complete connection setup for an implied connect */
955         if (unlikely(sock->state == SS_CONNECTING)) {
956                 res = auto_connect(sock, msg);
957                 if (res)
958                         goto exit;
959         }
960
961         /* Discard an empty non-errored message & try again */
962         if ((!sz) && (!err)) {
963                 advance_rx_queue(sk);
964                 goto restart;
965         }
966
967         /* Capture sender's address (optional) */
968         set_orig_addr(m, msg);
969
970         /* Capture ancillary data (optional) */
971         res = anc_data_recv(m, msg, tport);
972         if (res)
973                 goto exit;
974
975         /* Capture message data (if valid) & compute return value (always) */
976         if (!err) {
977                 if (unlikely(buf_len < sz)) {
978                         sz = buf_len;
979                         m->msg_flags |= MSG_TRUNC;
980                 }
981                 res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
982                                               m->msg_iov, sz);
983                 if (res)
984                         goto exit;
985                 res = sz;
986         } else {
987                 if ((sock->state == SS_READY) ||
988                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
989                         res = 0;
990                 else
991                         res = -ECONNRESET;
992         }
993
994         /* Consume received message (optional) */
995         if (likely(!(flags & MSG_PEEK))) {
996                 if ((sock->state != SS_READY) &&
997                     (++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
998                         tipc_acknowledge(tport->ref, tport->conn_unacked);
999                 advance_rx_queue(sk);
1000         }
1001 exit:
1002         release_sock(sk);
1003         return res;
1004 }
1005
1006 /**
1007  * recv_stream - receive stream-oriented data
1008  * @iocb: (unused)
1009  * @m: descriptor for message info
1010  * @buf_len: total size of user buffer area
1011  * @flags: receive flags
1012  *
1013  * Used for SOCK_STREAM messages only.  If not enough data is available
1014  * will optionally wait for more; never truncates data.
1015  *
1016  * Returns size of returned message data, errno otherwise
1017  */
1018 static int recv_stream(struct kiocb *iocb, struct socket *sock,
1019                        struct msghdr *m, size_t buf_len, int flags)
1020 {
1021         struct sock *sk = sock->sk;
1022         struct tipc_port *tport = tipc_sk_port(sk);
1023         struct sk_buff *buf;
1024         struct tipc_msg *msg;
1025         long timeout;
1026         unsigned int sz;
1027         int sz_to_copy, target, needed;
1028         int sz_copied = 0;
1029         u32 err;
1030         int res = 0;
1031
1032         /* Catch invalid receive attempts */
1033         if (unlikely(!buf_len))
1034                 return -EINVAL;
1035
1036         lock_sock(sk);
1037
1038         if (unlikely((sock->state == SS_UNCONNECTED) ||
1039                      (sock->state == SS_CONNECTING))) {
1040                 res = -ENOTCONN;
1041                 goto exit;
1042         }
1043
1044         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1045         timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1046
1047 restart:
1048         /* Look for a message in receive queue; wait if necessary */
1049         while (skb_queue_empty(&sk->sk_receive_queue)) {
1050                 if (sock->state == SS_DISCONNECTING) {
1051                         res = -ENOTCONN;
1052                         goto exit;
1053                 }
1054                 if (timeout <= 0L) {
1055                         res = timeout ? timeout : -EWOULDBLOCK;
1056                         goto exit;
1057                 }
1058                 release_sock(sk);
1059                 timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
1060                                                            tipc_rx_ready(sock),
1061                                                            timeout);
1062                 lock_sock(sk);
1063         }
1064
1065         /* Look at first message in receive queue */
1066         buf = skb_peek(&sk->sk_receive_queue);
1067         msg = buf_msg(buf);
1068         sz = msg_data_sz(msg);
1069         err = msg_errcode(msg);
1070
1071         /* Discard an empty non-errored message & try again */
1072         if ((!sz) && (!err)) {
1073                 advance_rx_queue(sk);
1074                 goto restart;
1075         }
1076
1077         /* Optionally capture sender's address & ancillary data of first msg */
1078         if (sz_copied == 0) {
1079                 set_orig_addr(m, msg);
1080                 res = anc_data_recv(m, msg, tport);
1081                 if (res)
1082                         goto exit;
1083         }
1084
1085         /* Capture message data (if valid) & compute return value (always) */
1086         if (!err) {
1087                 u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1088
1089                 sz -= offset;
1090                 needed = (buf_len - sz_copied);
1091                 sz_to_copy = (sz <= needed) ? sz : needed;
1092
1093                 res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
1094                                               m->msg_iov, sz_to_copy);
1095                 if (res)
1096                         goto exit;
1097
1098                 sz_copied += sz_to_copy;
1099
1100                 if (sz_to_copy < sz) {
1101                         if (!(flags & MSG_PEEK))
1102                                 TIPC_SKB_CB(buf)->handle =
1103                                 (void *)(unsigned long)(offset + sz_to_copy);
1104                         goto exit;
1105                 }
1106         } else {
1107                 if (sz_copied != 0)
1108                         goto exit; /* can't add error msg to valid data */
1109
1110                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1111                         res = 0;
1112                 else
1113                         res = -ECONNRESET;
1114         }
1115
1116         /* Consume received message (optional) */
1117         if (likely(!(flags & MSG_PEEK))) {
1118                 if (unlikely(++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1119                         tipc_acknowledge(tport->ref, tport->conn_unacked);
1120                 advance_rx_queue(sk);
1121         }
1122
1123         /* Loop around if more data is required */
1124         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1125             (!skb_queue_empty(&sk->sk_receive_queue) ||
1126             (sz_copied < target)) &&    /* and more is ready or required */
1127             (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1128             (!err))                     /* and haven't reached a FIN */
1129                 goto restart;
1130
1131 exit:
1132         release_sock(sk);
1133         return sz_copied ? sz_copied : res;
1134 }
1135
1136 /**
1137  * tipc_write_space - wake up thread if port congestion is released
1138  * @sk: socket
1139  */
1140 static void tipc_write_space(struct sock *sk)
1141 {
1142         struct socket_wq *wq;
1143
1144         rcu_read_lock();
1145         wq = rcu_dereference(sk->sk_wq);
1146         if (wq_has_sleeper(wq))
1147                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1148                                                 POLLWRNORM | POLLWRBAND);
1149         rcu_read_unlock();
1150 }
1151
1152 /**
1153  * tipc_data_ready - wake up threads to indicate messages have been received
1154  * @sk: socket
1155  * @len: the length of messages
1156  */
1157 static void tipc_data_ready(struct sock *sk, int len)
1158 {
1159         struct socket_wq *wq;
1160
1161         rcu_read_lock();
1162         wq = rcu_dereference(sk->sk_wq);
1163         if (wq_has_sleeper(wq))
1164                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1165                                                 POLLRDNORM | POLLRDBAND);
1166         rcu_read_unlock();
1167 }
1168
1169 /**
1170  * rx_queue_full - determine if receive queue can accept another message
1171  * @msg: message to be added to queue
1172  * @queue_size: current size of queue
1173  * @base: nominal maximum size of queue
1174  *
1175  * Returns 1 if queue is unable to accept message, 0 otherwise
1176  */
1177 static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base)
1178 {
1179         u32 threshold;
1180         u32 imp = msg_importance(msg);
1181
1182         if (imp == TIPC_LOW_IMPORTANCE)
1183                 threshold = base;
1184         else if (imp == TIPC_MEDIUM_IMPORTANCE)
1185                 threshold = base * 2;
1186         else if (imp == TIPC_HIGH_IMPORTANCE)
1187                 threshold = base * 100;
1188         else
1189                 return 0;
1190
1191         if (msg_connected(msg))
1192                 threshold *= 4;
1193
1194         return queue_size >= threshold;
1195 }
1196
1197 /**
1198  * filter_rcv - validate incoming message
1199  * @sk: socket
1200  * @buf: message
1201  *
1202  * Enqueues message on receive queue if acceptable; optionally handles
1203  * disconnect indication for a connected socket.
1204  *
1205  * Called with socket lock already taken; port lock may also be taken.
1206  *
1207  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1208  */
1209 static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1210 {
1211         struct socket *sock = sk->sk_socket;
1212         struct tipc_msg *msg = buf_msg(buf);
1213         u32 recv_q_len;
1214
1215         /* Reject message if it is wrong sort of message for socket */
1216         if (msg_type(msg) > TIPC_DIRECT_MSG)
1217                 return TIPC_ERR_NO_PORT;
1218
1219         if (sock->state == SS_READY) {
1220                 if (msg_connected(msg))
1221                         return TIPC_ERR_NO_PORT;
1222         } else {
1223                 if (msg_mcast(msg))
1224                         return TIPC_ERR_NO_PORT;
1225                 if (sock->state == SS_CONNECTED) {
1226                         if (!msg_connected(msg) ||
1227                             !tipc_port_peer_msg(tipc_sk_port(sk), msg))
1228                                 return TIPC_ERR_NO_PORT;
1229                 } else if (sock->state == SS_CONNECTING) {
1230                         if (!msg_connected(msg) && (msg_errcode(msg) == 0))
1231                                 return TIPC_ERR_NO_PORT;
1232                 } else if (sock->state == SS_LISTENING) {
1233                         if (msg_connected(msg) || msg_errcode(msg))
1234                                 return TIPC_ERR_NO_PORT;
1235                 } else if (sock->state == SS_DISCONNECTING) {
1236                         return TIPC_ERR_NO_PORT;
1237                 } else /* (sock->state == SS_UNCONNECTED) */ {
1238                         if (msg_connected(msg) || msg_errcode(msg))
1239                                 return TIPC_ERR_NO_PORT;
1240                 }
1241         }
1242
1243         /* Reject message if there isn't room to queue it */
1244         recv_q_len = (u32)atomic_read(&tipc_queue_size);
1245         if (unlikely(recv_q_len >= OVERLOAD_LIMIT_BASE)) {
1246                 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE))
1247                         return TIPC_ERR_OVERLOAD;
1248         }
1249         recv_q_len = skb_queue_len(&sk->sk_receive_queue);
1250         if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) {
1251                 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2))
1252                         return TIPC_ERR_OVERLOAD;
1253         }
1254
1255         /* Enqueue message (finally!) */
1256         TIPC_SKB_CB(buf)->handle = 0;
1257         atomic_inc(&tipc_queue_size);
1258         __skb_queue_tail(&sk->sk_receive_queue, buf);
1259
1260         /* Initiate connection termination for an incoming 'FIN' */
1261         if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) {
1262                 sock->state = SS_DISCONNECTING;
1263                 tipc_disconnect_port(tipc_sk_port(sk));
1264         }
1265
1266         sk->sk_data_ready(sk, 0);
1267         return TIPC_OK;
1268 }
1269
1270 /**
1271  * backlog_rcv - handle incoming message from backlog queue
1272  * @sk: socket
1273  * @buf: message
1274  *
1275  * Caller must hold socket lock, but not port lock.
1276  *
1277  * Returns 0
1278  */
1279 static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
1280 {
1281         u32 res;
1282
1283         res = filter_rcv(sk, buf);
1284         if (res)
1285                 tipc_reject_msg(buf, res);
1286         return 0;
1287 }
1288
1289 /**
1290  * dispatch - handle incoming message
1291  * @tport: TIPC port that received message
1292  * @buf: message
1293  *
1294  * Called with port lock already taken.
1295  *
1296  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1297  */
1298 static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
1299 {
1300         struct sock *sk = (struct sock *)tport->usr_handle;
1301         u32 res;
1302
1303         /*
1304          * Process message if socket is unlocked; otherwise add to backlog queue
1305          *
1306          * This code is based on sk_receive_skb(), but must be distinct from it
1307          * since a TIPC-specific filter/reject mechanism is utilized
1308          */
1309         bh_lock_sock(sk);
1310         if (!sock_owned_by_user(sk)) {
1311                 res = filter_rcv(sk, buf);
1312         } else {
1313                 if (sk_add_backlog(sk, buf, sk->sk_rcvbuf))
1314                         res = TIPC_ERR_OVERLOAD;
1315                 else
1316                         res = TIPC_OK;
1317         }
1318         bh_unlock_sock(sk);
1319
1320         return res;
1321 }
1322
1323 /**
1324  * wakeupdispatch - wake up port after congestion
1325  * @tport: port to wakeup
1326  *
1327  * Called with port lock already taken.
1328  */
1329 static void wakeupdispatch(struct tipc_port *tport)
1330 {
1331         struct sock *sk = (struct sock *)tport->usr_handle;
1332
1333         sk->sk_write_space(sk);
1334 }
1335
1336 /**
1337  * connect - establish a connection to another TIPC port
1338  * @sock: socket structure
1339  * @dest: socket address for destination port
1340  * @destlen: size of socket address data structure
1341  * @flags: file-related flags associated with socket
1342  *
1343  * Returns 0 on success, errno otherwise
1344  */
1345 static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1346                    int flags)
1347 {
1348         struct sock *sk = sock->sk;
1349         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1350         struct msghdr m = {NULL,};
1351         struct sk_buff *buf;
1352         struct tipc_msg *msg;
1353         unsigned int timeout;
1354         int res;
1355
1356         lock_sock(sk);
1357
1358         /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1359         if (sock->state == SS_READY) {
1360                 res = -EOPNOTSUPP;
1361                 goto exit;
1362         }
1363
1364         /* For now, TIPC does not support the non-blocking form of connect() */
1365         if (flags & O_NONBLOCK) {
1366                 res = -EOPNOTSUPP;
1367                 goto exit;
1368         }
1369
1370         /* Issue Posix-compliant error code if socket is in the wrong state */
1371         if (sock->state == SS_LISTENING) {
1372                 res = -EOPNOTSUPP;
1373                 goto exit;
1374         }
1375         if (sock->state == SS_CONNECTING) {
1376                 res = -EALREADY;
1377                 goto exit;
1378         }
1379         if (sock->state != SS_UNCONNECTED) {
1380                 res = -EISCONN;
1381                 goto exit;
1382         }
1383
1384         /*
1385          * Reject connection attempt using multicast address
1386          *
1387          * Note: send_msg() validates the rest of the address fields,
1388          *       so there's no need to do it here
1389          */
1390         if (dst->addrtype == TIPC_ADDR_MCAST) {
1391                 res = -EINVAL;
1392                 goto exit;
1393         }
1394
1395         /* Reject any messages already in receive queue (very unlikely) */
1396         reject_rx_queue(sk);
1397
1398         /* Send a 'SYN-' to destination */
1399         m.msg_name = dest;
1400         m.msg_namelen = destlen;
1401         res = send_msg(NULL, sock, &m, 0);
1402         if (res < 0)
1403                 goto exit;
1404
1405         /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1406         timeout = tipc_sk(sk)->conn_timeout;
1407         release_sock(sk);
1408         res = wait_event_interruptible_timeout(*sk_sleep(sk),
1409                         (!skb_queue_empty(&sk->sk_receive_queue) ||
1410                         (sock->state != SS_CONNECTING)),
1411                         timeout ? (long)msecs_to_jiffies(timeout)
1412                                 : MAX_SCHEDULE_TIMEOUT);
1413         lock_sock(sk);
1414
1415         if (res > 0) {
1416                 buf = skb_peek(&sk->sk_receive_queue);
1417                 if (buf != NULL) {
1418                         msg = buf_msg(buf);
1419                         res = auto_connect(sock, msg);
1420                         if (!res) {
1421                                 if (!msg_data_sz(msg))
1422                                         advance_rx_queue(sk);
1423                         }
1424                 } else {
1425                         if (sock->state == SS_CONNECTED)
1426                                 res = -EISCONN;
1427                         else
1428                                 res = -ECONNREFUSED;
1429                 }
1430         } else {
1431                 if (res == 0)
1432                         res = -ETIMEDOUT;
1433                 else
1434                         ; /* leave "res" unchanged */
1435                 sock->state = SS_DISCONNECTING;
1436         }
1437
1438 exit:
1439         release_sock(sk);
1440         return res;
1441 }
1442
1443 /**
1444  * listen - allow socket to listen for incoming connections
1445  * @sock: socket structure
1446  * @len: (unused)
1447  *
1448  * Returns 0 on success, errno otherwise
1449  */
1450 static int listen(struct socket *sock, int len)
1451 {
1452         struct sock *sk = sock->sk;
1453         int res;
1454
1455         lock_sock(sk);
1456
1457         if (sock->state != SS_UNCONNECTED)
1458                 res = -EINVAL;
1459         else {
1460                 sock->state = SS_LISTENING;
1461                 res = 0;
1462         }
1463
1464         release_sock(sk);
1465         return res;
1466 }
1467
1468 /**
1469  * accept - wait for connection request
1470  * @sock: listening socket
1471  * @newsock: new socket that is to be connected
1472  * @flags: file-related flags associated with socket
1473  *
1474  * Returns 0 on success, errno otherwise
1475  */
1476 static int accept(struct socket *sock, struct socket *new_sock, int flags)
1477 {
1478         struct sock *sk = sock->sk;
1479         struct sk_buff *buf;
1480         int res;
1481
1482         lock_sock(sk);
1483
1484         if (sock->state != SS_LISTENING) {
1485                 res = -EINVAL;
1486                 goto exit;
1487         }
1488
1489         while (skb_queue_empty(&sk->sk_receive_queue)) {
1490                 if (flags & O_NONBLOCK) {
1491                         res = -EWOULDBLOCK;
1492                         goto exit;
1493                 }
1494                 release_sock(sk);
1495                 res = wait_event_interruptible(*sk_sleep(sk),
1496                                 (!skb_queue_empty(&sk->sk_receive_queue)));
1497                 lock_sock(sk);
1498                 if (res)
1499                         goto exit;
1500         }
1501
1502         buf = skb_peek(&sk->sk_receive_queue);
1503
1504         res = tipc_create(sock_net(sock->sk), new_sock, 0, 0);
1505         if (!res) {
1506                 struct sock *new_sk = new_sock->sk;
1507                 struct tipc_sock *new_tsock = tipc_sk(new_sk);
1508                 struct tipc_port *new_tport = new_tsock->p;
1509                 u32 new_ref = new_tport->ref;
1510                 struct tipc_msg *msg = buf_msg(buf);
1511
1512                 lock_sock(new_sk);
1513
1514                 /*
1515                  * Reject any stray messages received by new socket
1516                  * before the socket lock was taken (very, very unlikely)
1517                  */
1518                 reject_rx_queue(new_sk);
1519
1520                 /* Connect new socket to it's peer */
1521                 new_tsock->peer_name.ref = msg_origport(msg);
1522                 new_tsock->peer_name.node = msg_orignode(msg);
1523                 tipc_connect2port(new_ref, &new_tsock->peer_name);
1524                 new_sock->state = SS_CONNECTED;
1525
1526                 tipc_set_portimportance(new_ref, msg_importance(msg));
1527                 if (msg_named(msg)) {
1528                         new_tport->conn_type = msg_nametype(msg);
1529                         new_tport->conn_instance = msg_nameinst(msg);
1530                 }
1531
1532                 /*
1533                  * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1534                  * Respond to 'SYN+' by queuing it on new socket.
1535                  */
1536                 if (!msg_data_sz(msg)) {
1537                         struct msghdr m = {NULL,};
1538
1539                         advance_rx_queue(sk);
1540                         send_packet(NULL, new_sock, &m, 0);
1541                 } else {
1542                         __skb_dequeue(&sk->sk_receive_queue);
1543                         __skb_queue_head(&new_sk->sk_receive_queue, buf);
1544                 }
1545                 release_sock(new_sk);
1546         }
1547 exit:
1548         release_sock(sk);
1549         return res;
1550 }
1551
1552 /**
1553  * shutdown - shutdown socket connection
1554  * @sock: socket structure
1555  * @how: direction to close (must be SHUT_RDWR)
1556  *
1557  * Terminates connection (if necessary), then purges socket's receive queue.
1558  *
1559  * Returns 0 on success, errno otherwise
1560  */
1561 static int shutdown(struct socket *sock, int how)
1562 {
1563         struct sock *sk = sock->sk;
1564         struct tipc_port *tport = tipc_sk_port(sk);
1565         struct sk_buff *buf;
1566         int res;
1567
1568         if (how != SHUT_RDWR)
1569                 return -EINVAL;
1570
1571         lock_sock(sk);
1572
1573         switch (sock->state) {
1574         case SS_CONNECTING:
1575         case SS_CONNECTED:
1576
1577 restart:
1578                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1579                 buf = __skb_dequeue(&sk->sk_receive_queue);
1580                 if (buf) {
1581                         atomic_dec(&tipc_queue_size);
1582                         if (TIPC_SKB_CB(buf)->handle != 0) {
1583                                 kfree_skb(buf);
1584                                 goto restart;
1585                         }
1586                         tipc_disconnect(tport->ref);
1587                         tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
1588                 } else {
1589                         tipc_shutdown(tport->ref);
1590                 }
1591
1592                 sock->state = SS_DISCONNECTING;
1593
1594                 /* fall through */
1595
1596         case SS_DISCONNECTING:
1597
1598                 /* Discard any unreceived messages */
1599                 discard_rx_queue(sk);
1600
1601                 /* Wake up anyone sleeping in poll */
1602                 sk->sk_state_change(sk);
1603                 res = 0;
1604                 break;
1605
1606         default:
1607                 res = -ENOTCONN;
1608         }
1609
1610         release_sock(sk);
1611         return res;
1612 }
1613
1614 /**
1615  * setsockopt - set socket option
1616  * @sock: socket structure
1617  * @lvl: option level
1618  * @opt: option identifier
1619  * @ov: pointer to new option value
1620  * @ol: length of option value
1621  *
1622  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
1623  * (to ease compatibility).
1624  *
1625  * Returns 0 on success, errno otherwise
1626  */
1627 static int setsockopt(struct socket *sock,
1628                       int lvl, int opt, char __user *ov, unsigned int ol)
1629 {
1630         struct sock *sk = sock->sk;
1631         struct tipc_port *tport = tipc_sk_port(sk);
1632         u32 value;
1633         int res;
1634
1635         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1636                 return 0;
1637         if (lvl != SOL_TIPC)
1638                 return -ENOPROTOOPT;
1639         if (ol < sizeof(value))
1640                 return -EINVAL;
1641         res = get_user(value, (u32 __user *)ov);
1642         if (res)
1643                 return res;
1644
1645         lock_sock(sk);
1646
1647         switch (opt) {
1648         case TIPC_IMPORTANCE:
1649                 res = tipc_set_portimportance(tport->ref, value);
1650                 break;
1651         case TIPC_SRC_DROPPABLE:
1652                 if (sock->type != SOCK_STREAM)
1653                         res = tipc_set_portunreliable(tport->ref, value);
1654                 else
1655                         res = -ENOPROTOOPT;
1656                 break;
1657         case TIPC_DEST_DROPPABLE:
1658                 res = tipc_set_portunreturnable(tport->ref, value);
1659                 break;
1660         case TIPC_CONN_TIMEOUT:
1661                 tipc_sk(sk)->conn_timeout = value;
1662                 /* no need to set "res", since already 0 at this point */
1663                 break;
1664         default:
1665                 res = -EINVAL;
1666         }
1667
1668         release_sock(sk);
1669
1670         return res;
1671 }
1672
1673 /**
1674  * getsockopt - get socket option
1675  * @sock: socket structure
1676  * @lvl: option level
1677  * @opt: option identifier
1678  * @ov: receptacle for option value
1679  * @ol: receptacle for length of option value
1680  *
1681  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
1682  * (to ease compatibility).
1683  *
1684  * Returns 0 on success, errno otherwise
1685  */
1686 static int getsockopt(struct socket *sock,
1687                       int lvl, int opt, char __user *ov, int __user *ol)
1688 {
1689         struct sock *sk = sock->sk;
1690         struct tipc_port *tport = tipc_sk_port(sk);
1691         int len;
1692         u32 value;
1693         int res;
1694
1695         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1696                 return put_user(0, ol);
1697         if (lvl != SOL_TIPC)
1698                 return -ENOPROTOOPT;
1699         res = get_user(len, ol);
1700         if (res)
1701                 return res;
1702
1703         lock_sock(sk);
1704
1705         switch (opt) {
1706         case TIPC_IMPORTANCE:
1707                 res = tipc_portimportance(tport->ref, &value);
1708                 break;
1709         case TIPC_SRC_DROPPABLE:
1710                 res = tipc_portunreliable(tport->ref, &value);
1711                 break;
1712         case TIPC_DEST_DROPPABLE:
1713                 res = tipc_portunreturnable(tport->ref, &value);
1714                 break;
1715         case TIPC_CONN_TIMEOUT:
1716                 value = tipc_sk(sk)->conn_timeout;
1717                 /* no need to set "res", since already 0 at this point */
1718                 break;
1719         case TIPC_NODE_RECVQ_DEPTH:
1720                 value = (u32)atomic_read(&tipc_queue_size);
1721                 break;
1722         case TIPC_SOCK_RECVQ_DEPTH:
1723                 value = skb_queue_len(&sk->sk_receive_queue);
1724                 break;
1725         default:
1726                 res = -EINVAL;
1727         }
1728
1729         release_sock(sk);
1730
1731         if (res)
1732                 return res;     /* "get" failed */
1733
1734         if (len < sizeof(value))
1735                 return -EINVAL;
1736
1737         if (copy_to_user(ov, &value, sizeof(value)))
1738                 return -EFAULT;
1739
1740         return put_user(sizeof(value), ol);
1741 }
1742
1743 /* Protocol switches for the various types of TIPC sockets */
1744
1745 static const struct proto_ops msg_ops = {
1746         .owner          = THIS_MODULE,
1747         .family         = AF_TIPC,
1748         .release        = release,
1749         .bind           = bind,
1750         .connect        = connect,
1751         .socketpair     = sock_no_socketpair,
1752         .accept         = sock_no_accept,
1753         .getname        = get_name,
1754         .poll           = poll,
1755         .ioctl          = sock_no_ioctl,
1756         .listen         = sock_no_listen,
1757         .shutdown       = shutdown,
1758         .setsockopt     = setsockopt,
1759         .getsockopt     = getsockopt,
1760         .sendmsg        = send_msg,
1761         .recvmsg        = recv_msg,
1762         .mmap           = sock_no_mmap,
1763         .sendpage       = sock_no_sendpage
1764 };
1765
1766 static const struct proto_ops packet_ops = {
1767         .owner          = THIS_MODULE,
1768         .family         = AF_TIPC,
1769         .release        = release,
1770         .bind           = bind,
1771         .connect        = connect,
1772         .socketpair     = sock_no_socketpair,
1773         .accept         = accept,
1774         .getname        = get_name,
1775         .poll           = poll,
1776         .ioctl          = sock_no_ioctl,
1777         .listen         = listen,
1778         .shutdown       = shutdown,
1779         .setsockopt     = setsockopt,
1780         .getsockopt     = getsockopt,
1781         .sendmsg        = send_packet,
1782         .recvmsg        = recv_msg,
1783         .mmap           = sock_no_mmap,
1784         .sendpage       = sock_no_sendpage
1785 };
1786
1787 static const struct proto_ops stream_ops = {
1788         .owner          = THIS_MODULE,
1789         .family         = AF_TIPC,
1790         .release        = release,
1791         .bind           = bind,
1792         .connect        = connect,
1793         .socketpair     = sock_no_socketpair,
1794         .accept         = accept,
1795         .getname        = get_name,
1796         .poll           = poll,
1797         .ioctl          = sock_no_ioctl,
1798         .listen         = listen,
1799         .shutdown       = shutdown,
1800         .setsockopt     = setsockopt,
1801         .getsockopt     = getsockopt,
1802         .sendmsg        = send_stream,
1803         .recvmsg        = recv_stream,
1804         .mmap           = sock_no_mmap,
1805         .sendpage       = sock_no_sendpage
1806 };
1807
1808 static const struct net_proto_family tipc_family_ops = {
1809         .owner          = THIS_MODULE,
1810         .family         = AF_TIPC,
1811         .create         = tipc_create
1812 };
1813
1814 static struct proto tipc_proto = {
1815         .name           = "TIPC",
1816         .owner          = THIS_MODULE,
1817         .obj_size       = sizeof(struct tipc_sock)
1818 };
1819
1820 /**
1821  * tipc_socket_init - initialize TIPC socket interface
1822  *
1823  * Returns 0 on success, errno otherwise
1824  */
1825 int tipc_socket_init(void)
1826 {
1827         int res;
1828
1829         res = proto_register(&tipc_proto, 1);
1830         if (res) {
1831                 pr_err("Failed to register TIPC protocol type\n");
1832                 goto out;
1833         }
1834
1835         res = sock_register(&tipc_family_ops);
1836         if (res) {
1837                 pr_err("Failed to register TIPC socket type\n");
1838                 proto_unregister(&tipc_proto);
1839                 goto out;
1840         }
1841
1842         sockets_enabled = 1;
1843  out:
1844         return res;
1845 }
1846
1847 /**
1848  * tipc_socket_stop - stop TIPC socket interface
1849  */
1850 void tipc_socket_stop(void)
1851 {
1852         if (!sockets_enabled)
1853                 return;
1854
1855         sockets_enabled = 0;
1856         sock_unregister(tipc_family_ops.family);
1857         proto_unregister(&tipc_proto);
1858 }