]> git.karo-electronics.de Git - karo-tx-linux.git/blob - net/tipc/socket.c
arm: imx6qdl: tx6: defconfig: mfg tool
[karo-tx-linux.git] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2014, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "port.h"
39 #include "node.h"
40
41 #include <linux/export.h>
42
43 #define SS_LISTENING    -1      /* socket is listening */
44 #define SS_READY        -2      /* socket is connectionless */
45
46 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
47
48 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
49 static void tipc_data_ready(struct sock *sk);
50 static void tipc_write_space(struct sock *sk);
51 static int tipc_release(struct socket *sock);
52 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
53
54 static const struct proto_ops packet_ops;
55 static const struct proto_ops stream_ops;
56 static const struct proto_ops msg_ops;
57
58 static struct proto tipc_proto;
59 static struct proto tipc_proto_kern;
60
61 /*
62  * Revised TIPC socket locking policy:
63  *
64  * Most socket operations take the standard socket lock when they start
65  * and hold it until they finish (or until they need to sleep).  Acquiring
66  * this lock grants the owner exclusive access to the fields of the socket
67  * data structures, with the exception of the backlog queue.  A few socket
68  * operations can be done without taking the socket lock because they only
69  * read socket information that never changes during the life of the socket.
70  *
71  * Socket operations may acquire the lock for the associated TIPC port if they
72  * need to perform an operation on the port.  If any routine needs to acquire
73  * both the socket lock and the port lock it must take the socket lock first
74  * to avoid the risk of deadlock.
75  *
76  * The dispatcher handling incoming messages cannot grab the socket lock in
77  * the standard fashion, since invoked it runs at the BH level and cannot block.
78  * Instead, it checks to see if the socket lock is currently owned by someone,
79  * and either handles the message itself or adds it to the socket's backlog
80  * queue; in the latter case the queued message is processed once the process
81  * owning the socket lock releases it.
82  *
83  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
84  * the problem of a blocked socket operation preventing any other operations
85  * from occurring.  However, applications must be careful if they have
86  * multiple threads trying to send (or receive) on the same socket, as these
87  * operations might interfere with each other.  For example, doing a connect
88  * and a receive at the same time might allow the receive to consume the
89  * ACK message meant for the connect.  While additional work could be done
90  * to try and overcome this, it doesn't seem to be worthwhile at the present.
91  *
92  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
93  * that another operation that must be performed in a non-blocking manner is
94  * not delayed for very long because the lock has already been taken.
95  *
96  * NOTE: This code assumes that certain fields of a port/socket pair are
97  * constant over its lifetime; such fields can be examined without taking
98  * the socket lock and/or port lock, and do not need to be re-read even
99  * after resuming processing after waiting.  These fields include:
100  *   - socket type
101  *   - pointer to socket sk structure (aka tipc_sock structure)
102  *   - pointer to port structure
103  *   - port reference
104  */
105
106 #include "socket.h"
107
108 /**
109  * advance_rx_queue - discard first buffer in socket receive queue
110  *
111  * Caller must hold socket lock
112  */
113 static void advance_rx_queue(struct sock *sk)
114 {
115         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
116 }
117
118 /**
119  * reject_rx_queue - reject all buffers in socket receive queue
120  *
121  * Caller must hold socket lock
122  */
123 static void reject_rx_queue(struct sock *sk)
124 {
125         struct sk_buff *buf;
126
127         while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
128                 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
129 }
130
131 /**
132  * tipc_sk_create - create a TIPC socket
133  * @net: network namespace (must be default network)
134  * @sock: pre-allocated socket structure
135  * @protocol: protocol indicator (must be 0)
136  * @kern: caused by kernel or by userspace?
137  *
138  * This routine creates additional data structures used by the TIPC socket,
139  * initializes them, and links them together.
140  *
141  * Returns 0 on success, errno otherwise
142  */
143 static int tipc_sk_create(struct net *net, struct socket *sock,
144                           int protocol, int kern)
145 {
146         const struct proto_ops *ops;
147         socket_state state;
148         struct sock *sk;
149         struct tipc_sock *tsk;
150         struct tipc_port *port;
151         u32 ref;
152
153         /* Validate arguments */
154         if (unlikely(protocol != 0))
155                 return -EPROTONOSUPPORT;
156
157         switch (sock->type) {
158         case SOCK_STREAM:
159                 ops = &stream_ops;
160                 state = SS_UNCONNECTED;
161                 break;
162         case SOCK_SEQPACKET:
163                 ops = &packet_ops;
164                 state = SS_UNCONNECTED;
165                 break;
166         case SOCK_DGRAM:
167         case SOCK_RDM:
168                 ops = &msg_ops;
169                 state = SS_READY;
170                 break;
171         default:
172                 return -EPROTOTYPE;
173         }
174
175         /* Allocate socket's protocol area */
176         if (!kern)
177                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
178         else
179                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
180
181         if (sk == NULL)
182                 return -ENOMEM;
183
184         tsk = tipc_sk(sk);
185         port = &tsk->port;
186
187         ref = tipc_port_init(port, TIPC_LOW_IMPORTANCE);
188         if (!ref) {
189                 pr_warn("Socket registration failed, ref. table exhausted\n");
190                 sk_free(sk);
191                 return -ENOMEM;
192         }
193
194         /* Finish initializing socket data structures */
195         sock->ops = ops;
196         sock->state = state;
197
198         sock_init_data(sock, sk);
199         sk->sk_backlog_rcv = tipc_backlog_rcv;
200         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
201         sk->sk_data_ready = tipc_data_ready;
202         sk->sk_write_space = tipc_write_space;
203         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
204         atomic_set(&tsk->dupl_rcvcnt, 0);
205         tipc_port_unlock(port);
206
207         if (sock->state == SS_READY) {
208                 tipc_port_set_unreturnable(port, true);
209                 if (sock->type == SOCK_DGRAM)
210                         tipc_port_set_unreliable(port, true);
211         }
212         return 0;
213 }
214
215 /**
216  * tipc_sock_create_local - create TIPC socket from inside TIPC module
217  * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
218  *
219  * We cannot use sock_creat_kern here because it bumps module user count.
220  * Since socket owner and creator is the same module we must make sure
221  * that module count remains zero for module local sockets, otherwise
222  * we cannot do rmmod.
223  *
224  * Returns 0 on success, errno otherwise
225  */
226 int tipc_sock_create_local(int type, struct socket **res)
227 {
228         int rc;
229
230         rc = sock_create_lite(AF_TIPC, type, 0, res);
231         if (rc < 0) {
232                 pr_err("Failed to create kernel socket\n");
233                 return rc;
234         }
235         tipc_sk_create(&init_net, *res, 0, 1);
236
237         return 0;
238 }
239
240 /**
241  * tipc_sock_release_local - release socket created by tipc_sock_create_local
242  * @sock: the socket to be released.
243  *
244  * Module reference count is not incremented when such sockets are created,
245  * so we must keep it from being decremented when they are released.
246  */
247 void tipc_sock_release_local(struct socket *sock)
248 {
249         tipc_release(sock);
250         sock->ops = NULL;
251         sock_release(sock);
252 }
253
254 /**
255  * tipc_sock_accept_local - accept a connection on a socket created
256  * with tipc_sock_create_local. Use this function to avoid that
257  * module reference count is inadvertently incremented.
258  *
259  * @sock:    the accepting socket
260  * @newsock: reference to the new socket to be created
261  * @flags:   socket flags
262  */
263
264 int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
265                            int flags)
266 {
267         struct sock *sk = sock->sk;
268         int ret;
269
270         ret = sock_create_lite(sk->sk_family, sk->sk_type,
271                                sk->sk_protocol, newsock);
272         if (ret < 0)
273                 return ret;
274
275         ret = tipc_accept(sock, *newsock, flags);
276         if (ret < 0) {
277                 sock_release(*newsock);
278                 return ret;
279         }
280         (*newsock)->ops = sock->ops;
281         return ret;
282 }
283
284 /**
285  * tipc_release - destroy a TIPC socket
286  * @sock: socket to destroy
287  *
288  * This routine cleans up any messages that are still queued on the socket.
289  * For DGRAM and RDM socket types, all queued messages are rejected.
290  * For SEQPACKET and STREAM socket types, the first message is rejected
291  * and any others are discarded.  (If the first message on a STREAM socket
292  * is partially-read, it is discarded and the next one is rejected instead.)
293  *
294  * NOTE: Rejected messages are not necessarily returned to the sender!  They
295  * are returned or discarded according to the "destination droppable" setting
296  * specified for the message by the sender.
297  *
298  * Returns 0 on success, errno otherwise
299  */
300 static int tipc_release(struct socket *sock)
301 {
302         struct sock *sk = sock->sk;
303         struct tipc_sock *tsk;
304         struct tipc_port *port;
305         struct sk_buff *buf;
306
307         /*
308          * Exit if socket isn't fully initialized (occurs when a failed accept()
309          * releases a pre-allocated child socket that was never used)
310          */
311         if (sk == NULL)
312                 return 0;
313
314         tsk = tipc_sk(sk);
315         port = &tsk->port;
316         lock_sock(sk);
317
318         /*
319          * Reject all unreceived messages, except on an active connection
320          * (which disconnects locally & sends a 'FIN+' to peer)
321          */
322         while (sock->state != SS_DISCONNECTING) {
323                 buf = __skb_dequeue(&sk->sk_receive_queue);
324                 if (buf == NULL)
325                         break;
326                 if (TIPC_SKB_CB(buf)->handle != NULL)
327                         kfree_skb(buf);
328                 else {
329                         if ((sock->state == SS_CONNECTING) ||
330                             (sock->state == SS_CONNECTED)) {
331                                 sock->state = SS_DISCONNECTING;
332                                 tipc_port_disconnect(port->ref);
333                         }
334                         tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
335                 }
336         }
337
338         /* Destroy TIPC port; also disconnects an active connection and
339          * sends a 'FIN-' to peer.
340          */
341         tipc_port_destroy(port);
342
343         /* Discard any remaining (connection-based) messages in receive queue */
344         __skb_queue_purge(&sk->sk_receive_queue);
345
346         /* Reject any messages that accumulated in backlog queue */
347         sock->state = SS_DISCONNECTING;
348         release_sock(sk);
349
350         sock_put(sk);
351         sock->sk = NULL;
352
353         return 0;
354 }
355
356 /**
357  * tipc_bind - associate or disassocate TIPC name(s) with a socket
358  * @sock: socket structure
359  * @uaddr: socket address describing name(s) and desired operation
360  * @uaddr_len: size of socket address data structure
361  *
362  * Name and name sequence binding is indicated using a positive scope value;
363  * a negative scope value unbinds the specified name.  Specifying no name
364  * (i.e. a socket address length of 0) unbinds all names from the socket.
365  *
366  * Returns 0 on success, errno otherwise
367  *
368  * NOTE: This routine doesn't need to take the socket lock since it doesn't
369  *       access any non-constant socket information.
370  */
371 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
372                      int uaddr_len)
373 {
374         struct sock *sk = sock->sk;
375         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
376         struct tipc_sock *tsk = tipc_sk(sk);
377         int res = -EINVAL;
378
379         lock_sock(sk);
380         if (unlikely(!uaddr_len)) {
381                 res = tipc_withdraw(&tsk->port, 0, NULL);
382                 goto exit;
383         }
384
385         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
386                 res = -EINVAL;
387                 goto exit;
388         }
389         if (addr->family != AF_TIPC) {
390                 res = -EAFNOSUPPORT;
391                 goto exit;
392         }
393
394         if (addr->addrtype == TIPC_ADDR_NAME)
395                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
396         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
397                 res = -EAFNOSUPPORT;
398                 goto exit;
399         }
400
401         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
402             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
403             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
404                 res = -EACCES;
405                 goto exit;
406         }
407
408         res = (addr->scope > 0) ?
409                 tipc_publish(&tsk->port, addr->scope, &addr->addr.nameseq) :
410                 tipc_withdraw(&tsk->port, -addr->scope, &addr->addr.nameseq);
411 exit:
412         release_sock(sk);
413         return res;
414 }
415
416 /**
417  * tipc_getname - get port ID of socket or peer socket
418  * @sock: socket structure
419  * @uaddr: area for returned socket address
420  * @uaddr_len: area for returned length of socket address
421  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
422  *
423  * Returns 0 on success, errno otherwise
424  *
425  * NOTE: This routine doesn't need to take the socket lock since it only
426  *       accesses socket information that is unchanging (or which changes in
427  *       a completely predictable manner).
428  */
429 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
430                         int *uaddr_len, int peer)
431 {
432         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
433         struct tipc_sock *tsk = tipc_sk(sock->sk);
434
435         memset(addr, 0, sizeof(*addr));
436         if (peer) {
437                 if ((sock->state != SS_CONNECTED) &&
438                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
439                         return -ENOTCONN;
440                 addr->addr.id.ref = tipc_port_peerport(&tsk->port);
441                 addr->addr.id.node = tipc_port_peernode(&tsk->port);
442         } else {
443                 addr->addr.id.ref = tsk->port.ref;
444                 addr->addr.id.node = tipc_own_addr;
445         }
446
447         *uaddr_len = sizeof(*addr);
448         addr->addrtype = TIPC_ADDR_ID;
449         addr->family = AF_TIPC;
450         addr->scope = 0;
451         addr->addr.name.domain = 0;
452
453         return 0;
454 }
455
456 /**
457  * tipc_poll - read and possibly block on pollmask
458  * @file: file structure associated with the socket
459  * @sock: socket for which to calculate the poll bits
460  * @wait: ???
461  *
462  * Returns pollmask value
463  *
464  * COMMENTARY:
465  * It appears that the usual socket locking mechanisms are not useful here
466  * since the pollmask info is potentially out-of-date the moment this routine
467  * exits.  TCP and other protocols seem to rely on higher level poll routines
468  * to handle any preventable race conditions, so TIPC will do the same ...
469  *
470  * TIPC sets the returned events as follows:
471  *
472  * socket state         flags set
473  * ------------         ---------
474  * unconnected          no read flags
475  *                      POLLOUT if port is not congested
476  *
477  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
478  *                      no write flags
479  *
480  * connected            POLLIN/POLLRDNORM if data in rx queue
481  *                      POLLOUT if port is not congested
482  *
483  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
484  *                      no write flags
485  *
486  * listening            POLLIN if SYN in rx queue
487  *                      no write flags
488  *
489  * ready                POLLIN/POLLRDNORM if data in rx queue
490  * [connectionless]     POLLOUT (since port cannot be congested)
491  *
492  * IMPORTANT: The fact that a read or write operation is indicated does NOT
493  * imply that the operation will succeed, merely that it should be performed
494  * and will not block.
495  */
496 static unsigned int tipc_poll(struct file *file, struct socket *sock,
497                               poll_table *wait)
498 {
499         struct sock *sk = sock->sk;
500         struct tipc_sock *tsk = tipc_sk(sk);
501         u32 mask = 0;
502
503         sock_poll_wait(file, sk_sleep(sk), wait);
504
505         switch ((int)sock->state) {
506         case SS_UNCONNECTED:
507                 if (!tsk->port.congested)
508                         mask |= POLLOUT;
509                 break;
510         case SS_READY:
511         case SS_CONNECTED:
512                 if (!tsk->port.congested)
513                         mask |= POLLOUT;
514                 /* fall thru' */
515         case SS_CONNECTING:
516         case SS_LISTENING:
517                 if (!skb_queue_empty(&sk->sk_receive_queue))
518                         mask |= (POLLIN | POLLRDNORM);
519                 break;
520         case SS_DISCONNECTING:
521                 mask = (POLLIN | POLLRDNORM | POLLHUP);
522                 break;
523         }
524
525         return mask;
526 }
527
528 /**
529  * dest_name_check - verify user is permitted to send to specified port name
530  * @dest: destination address
531  * @m: descriptor for message to be sent
532  *
533  * Prevents restricted configuration commands from being issued by
534  * unauthorized users.
535  *
536  * Returns 0 if permission is granted, otherwise errno
537  */
538 static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
539 {
540         struct tipc_cfg_msg_hdr hdr;
541
542         if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
543                 return 0;
544         if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
545                 return 0;
546         if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
547                 return -EACCES;
548
549         if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
550                 return -EMSGSIZE;
551         if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
552                 return -EFAULT;
553         if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
554                 return -EACCES;
555
556         return 0;
557 }
558
559 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
560 {
561         struct sock *sk = sock->sk;
562         struct tipc_sock *tsk = tipc_sk(sk);
563         DEFINE_WAIT(wait);
564         int done;
565
566         do {
567                 int err = sock_error(sk);
568                 if (err)
569                         return err;
570                 if (sock->state == SS_DISCONNECTING)
571                         return -EPIPE;
572                 if (!*timeo_p)
573                         return -EAGAIN;
574                 if (signal_pending(current))
575                         return sock_intr_errno(*timeo_p);
576
577                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
578                 done = sk_wait_event(sk, timeo_p, !tsk->port.congested);
579                 finish_wait(sk_sleep(sk), &wait);
580         } while (!done);
581         return 0;
582 }
583
584
585 /**
586  * tipc_sendmsg - send message in connectionless manner
587  * @iocb: if NULL, indicates that socket lock is already held
588  * @sock: socket structure
589  * @m: message to send
590  * @total_len: length of message
591  *
592  * Message must have an destination specified explicitly.
593  * Used for SOCK_RDM and SOCK_DGRAM messages,
594  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
595  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
596  *
597  * Returns the number of bytes sent on success, or errno otherwise
598  */
599 static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
600                         struct msghdr *m, size_t total_len)
601 {
602         struct sock *sk = sock->sk;
603         struct tipc_sock *tsk = tipc_sk(sk);
604         struct tipc_port *port = &tsk->port;
605         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
606         int needs_conn;
607         long timeo;
608         int res = -EINVAL;
609
610         if (unlikely(!dest))
611                 return -EDESTADDRREQ;
612         if (unlikely((m->msg_namelen < sizeof(*dest)) ||
613                      (dest->family != AF_TIPC)))
614                 return -EINVAL;
615         if (total_len > TIPC_MAX_USER_MSG_SIZE)
616                 return -EMSGSIZE;
617
618         if (iocb)
619                 lock_sock(sk);
620
621         needs_conn = (sock->state != SS_READY);
622         if (unlikely(needs_conn)) {
623                 if (sock->state == SS_LISTENING) {
624                         res = -EPIPE;
625                         goto exit;
626                 }
627                 if (sock->state != SS_UNCONNECTED) {
628                         res = -EISCONN;
629                         goto exit;
630                 }
631                 if (tsk->port.published) {
632                         res = -EOPNOTSUPP;
633                         goto exit;
634                 }
635                 if (dest->addrtype == TIPC_ADDR_NAME) {
636                         tsk->port.conn_type = dest->addr.name.name.type;
637                         tsk->port.conn_instance = dest->addr.name.name.instance;
638                 }
639
640                 /* Abort any pending connection attempts (very unlikely) */
641                 reject_rx_queue(sk);
642         }
643
644         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
645         do {
646                 if (dest->addrtype == TIPC_ADDR_NAME) {
647                         res = dest_name_check(dest, m);
648                         if (res)
649                                 break;
650                         res = tipc_send2name(port,
651                                              &dest->addr.name.name,
652                                              dest->addr.name.domain,
653                                              m->msg_iov,
654                                              total_len);
655                 } else if (dest->addrtype == TIPC_ADDR_ID) {
656                         res = tipc_send2port(port,
657                                              &dest->addr.id,
658                                              m->msg_iov,
659                                              total_len);
660                 } else if (dest->addrtype == TIPC_ADDR_MCAST) {
661                         if (needs_conn) {
662                                 res = -EOPNOTSUPP;
663                                 break;
664                         }
665                         res = dest_name_check(dest, m);
666                         if (res)
667                                 break;
668                         res = tipc_port_mcast_xmit(port,
669                                                    &dest->addr.nameseq,
670                                                    m->msg_iov,
671                                                    total_len);
672                 }
673                 if (likely(res != -ELINKCONG)) {
674                         if (needs_conn && (res >= 0))
675                                 sock->state = SS_CONNECTING;
676                         break;
677                 }
678                 res = tipc_wait_for_sndmsg(sock, &timeo);
679                 if (res)
680                         break;
681         } while (1);
682
683 exit:
684         if (iocb)
685                 release_sock(sk);
686         return res;
687 }
688
689 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
690 {
691         struct sock *sk = sock->sk;
692         struct tipc_sock *tsk = tipc_sk(sk);
693         struct tipc_port *port = &tsk->port;
694         DEFINE_WAIT(wait);
695         int done;
696
697         do {
698                 int err = sock_error(sk);
699                 if (err)
700                         return err;
701                 if (sock->state == SS_DISCONNECTING)
702                         return -EPIPE;
703                 else if (sock->state != SS_CONNECTED)
704                         return -ENOTCONN;
705                 if (!*timeo_p)
706                         return -EAGAIN;
707                 if (signal_pending(current))
708                         return sock_intr_errno(*timeo_p);
709
710                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
711                 done = sk_wait_event(sk, timeo_p,
712                                      (!port->congested || !port->connected));
713                 finish_wait(sk_sleep(sk), &wait);
714         } while (!done);
715         return 0;
716 }
717
718 /**
719  * tipc_send_packet - send a connection-oriented message
720  * @iocb: if NULL, indicates that socket lock is already held
721  * @sock: socket structure
722  * @m: message to send
723  * @total_len: length of message
724  *
725  * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
726  *
727  * Returns the number of bytes sent on success, or errno otherwise
728  */
729 static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
730                             struct msghdr *m, size_t total_len)
731 {
732         struct sock *sk = sock->sk;
733         struct tipc_sock *tsk = tipc_sk(sk);
734         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
735         int res = -EINVAL;
736         long timeo;
737
738         /* Handle implied connection establishment */
739         if (unlikely(dest))
740                 return tipc_sendmsg(iocb, sock, m, total_len);
741
742         if (total_len > TIPC_MAX_USER_MSG_SIZE)
743                 return -EMSGSIZE;
744
745         if (iocb)
746                 lock_sock(sk);
747
748         if (unlikely(sock->state != SS_CONNECTED)) {
749                 if (sock->state == SS_DISCONNECTING)
750                         res = -EPIPE;
751                 else
752                         res = -ENOTCONN;
753                 goto exit;
754         }
755
756         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
757         do {
758                 res = tipc_send(&tsk->port, m->msg_iov, total_len);
759                 if (likely(res != -ELINKCONG))
760                         break;
761                 res = tipc_wait_for_sndpkt(sock, &timeo);
762                 if (res)
763                         break;
764         } while (1);
765 exit:
766         if (iocb)
767                 release_sock(sk);
768         return res;
769 }
770
771 /**
772  * tipc_send_stream - send stream-oriented data
773  * @iocb: (unused)
774  * @sock: socket structure
775  * @m: data to send
776  * @total_len: total length of data to be sent
777  *
778  * Used for SOCK_STREAM data.
779  *
780  * Returns the number of bytes sent on success (or partial success),
781  * or errno if no data sent
782  */
783 static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
784                             struct msghdr *m, size_t total_len)
785 {
786         struct sock *sk = sock->sk;
787         struct tipc_sock *tsk = tipc_sk(sk);
788         struct msghdr my_msg;
789         struct iovec my_iov;
790         struct iovec *curr_iov;
791         int curr_iovlen;
792         char __user *curr_start;
793         u32 hdr_size;
794         int curr_left;
795         int bytes_to_send;
796         int bytes_sent;
797         int res;
798
799         lock_sock(sk);
800
801         /* Handle special cases where there is no connection */
802         if (unlikely(sock->state != SS_CONNECTED)) {
803                 if (sock->state == SS_UNCONNECTED)
804                         res = tipc_send_packet(NULL, sock, m, total_len);
805                 else
806                         res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN;
807                 goto exit;
808         }
809
810         if (unlikely(m->msg_name)) {
811                 res = -EISCONN;
812                 goto exit;
813         }
814
815         if (total_len > (unsigned int)INT_MAX) {
816                 res = -EMSGSIZE;
817                 goto exit;
818         }
819
820         /*
821          * Send each iovec entry using one or more messages
822          *
823          * Note: This algorithm is good for the most likely case
824          * (i.e. one large iovec entry), but could be improved to pass sets
825          * of small iovec entries into send_packet().
826          */
827         curr_iov = m->msg_iov;
828         curr_iovlen = m->msg_iovlen;
829         my_msg.msg_iov = &my_iov;
830         my_msg.msg_iovlen = 1;
831         my_msg.msg_flags = m->msg_flags;
832         my_msg.msg_name = NULL;
833         bytes_sent = 0;
834
835         hdr_size = msg_hdr_sz(&tsk->port.phdr);
836
837         while (curr_iovlen--) {
838                 curr_start = curr_iov->iov_base;
839                 curr_left = curr_iov->iov_len;
840
841                 while (curr_left) {
842                         bytes_to_send = tsk->port.max_pkt - hdr_size;
843                         if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
844                                 bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
845                         if (curr_left < bytes_to_send)
846                                 bytes_to_send = curr_left;
847                         my_iov.iov_base = curr_start;
848                         my_iov.iov_len = bytes_to_send;
849                         res = tipc_send_packet(NULL, sock, &my_msg,
850                                                bytes_to_send);
851                         if (res < 0) {
852                                 if (bytes_sent)
853                                         res = bytes_sent;
854                                 goto exit;
855                         }
856                         curr_left -= bytes_to_send;
857                         curr_start += bytes_to_send;
858                         bytes_sent += bytes_to_send;
859                 }
860
861                 curr_iov++;
862         }
863         res = bytes_sent;
864 exit:
865         release_sock(sk);
866         return res;
867 }
868
869 /**
870  * auto_connect - complete connection setup to a remote port
871  * @tsk: tipc socket structure
872  * @msg: peer's response message
873  *
874  * Returns 0 on success, errno otherwise
875  */
876 static int auto_connect(struct tipc_sock *tsk, struct tipc_msg *msg)
877 {
878         struct tipc_port *port = &tsk->port;
879         struct socket *sock = tsk->sk.sk_socket;
880         struct tipc_portid peer;
881
882         peer.ref = msg_origport(msg);
883         peer.node = msg_orignode(msg);
884
885         __tipc_port_connect(port->ref, port, &peer);
886
887         if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)
888                 return -EINVAL;
889         msg_set_importance(&port->phdr, (u32)msg_importance(msg));
890         sock->state = SS_CONNECTED;
891         return 0;
892 }
893
894 /**
895  * set_orig_addr - capture sender's address for received message
896  * @m: descriptor for message info
897  * @msg: received message header
898  *
899  * Note: Address is not captured if not requested by receiver.
900  */
901 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
902 {
903         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
904
905         if (addr) {
906                 addr->family = AF_TIPC;
907                 addr->addrtype = TIPC_ADDR_ID;
908                 memset(&addr->addr, 0, sizeof(addr->addr));
909                 addr->addr.id.ref = msg_origport(msg);
910                 addr->addr.id.node = msg_orignode(msg);
911                 addr->addr.name.domain = 0;     /* could leave uninitialized */
912                 addr->scope = 0;                /* could leave uninitialized */
913                 m->msg_namelen = sizeof(struct sockaddr_tipc);
914         }
915 }
916
917 /**
918  * anc_data_recv - optionally capture ancillary data for received message
919  * @m: descriptor for message info
920  * @msg: received message header
921  * @tport: TIPC port associated with message
922  *
923  * Note: Ancillary data is not captured if not requested by receiver.
924  *
925  * Returns 0 if successful, otherwise errno
926  */
927 static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
928                          struct tipc_port *tport)
929 {
930         u32 anc_data[3];
931         u32 err;
932         u32 dest_type;
933         int has_name;
934         int res;
935
936         if (likely(m->msg_controllen == 0))
937                 return 0;
938
939         /* Optionally capture errored message object(s) */
940         err = msg ? msg_errcode(msg) : 0;
941         if (unlikely(err)) {
942                 anc_data[0] = err;
943                 anc_data[1] = msg_data_sz(msg);
944                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
945                 if (res)
946                         return res;
947                 if (anc_data[1]) {
948                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
949                                        msg_data(msg));
950                         if (res)
951                                 return res;
952                 }
953         }
954
955         /* Optionally capture message destination object */
956         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
957         switch (dest_type) {
958         case TIPC_NAMED_MSG:
959                 has_name = 1;
960                 anc_data[0] = msg_nametype(msg);
961                 anc_data[1] = msg_namelower(msg);
962                 anc_data[2] = msg_namelower(msg);
963                 break;
964         case TIPC_MCAST_MSG:
965                 has_name = 1;
966                 anc_data[0] = msg_nametype(msg);
967                 anc_data[1] = msg_namelower(msg);
968                 anc_data[2] = msg_nameupper(msg);
969                 break;
970         case TIPC_CONN_MSG:
971                 has_name = (tport->conn_type != 0);
972                 anc_data[0] = tport->conn_type;
973                 anc_data[1] = tport->conn_instance;
974                 anc_data[2] = tport->conn_instance;
975                 break;
976         default:
977                 has_name = 0;
978         }
979         if (has_name) {
980                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
981                 if (res)
982                         return res;
983         }
984
985         return 0;
986 }
987
988 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
989 {
990         struct sock *sk = sock->sk;
991         DEFINE_WAIT(wait);
992         long timeo = *timeop;
993         int err;
994
995         for (;;) {
996                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
997                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
998                         if (sock->state == SS_DISCONNECTING) {
999                                 err = -ENOTCONN;
1000                                 break;
1001                         }
1002                         release_sock(sk);
1003                         timeo = schedule_timeout(timeo);
1004                         lock_sock(sk);
1005                 }
1006                 err = 0;
1007                 if (!skb_queue_empty(&sk->sk_receive_queue))
1008                         break;
1009                 err = sock_intr_errno(timeo);
1010                 if (signal_pending(current))
1011                         break;
1012                 err = -EAGAIN;
1013                 if (!timeo)
1014                         break;
1015         }
1016         finish_wait(sk_sleep(sk), &wait);
1017         *timeop = timeo;
1018         return err;
1019 }
1020
1021 /**
1022  * tipc_recvmsg - receive packet-oriented message
1023  * @iocb: (unused)
1024  * @m: descriptor for message info
1025  * @buf_len: total size of user buffer area
1026  * @flags: receive flags
1027  *
1028  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1029  * If the complete message doesn't fit in user area, truncate it.
1030  *
1031  * Returns size of returned message data, errno otherwise
1032  */
1033 static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
1034                         struct msghdr *m, size_t buf_len, int flags)
1035 {
1036         struct sock *sk = sock->sk;
1037         struct tipc_sock *tsk = tipc_sk(sk);
1038         struct tipc_port *port = &tsk->port;
1039         struct sk_buff *buf;
1040         struct tipc_msg *msg;
1041         long timeo;
1042         unsigned int sz;
1043         u32 err;
1044         int res;
1045
1046         /* Catch invalid receive requests */
1047         if (unlikely(!buf_len))
1048                 return -EINVAL;
1049
1050         lock_sock(sk);
1051
1052         if (unlikely(sock->state == SS_UNCONNECTED)) {
1053                 res = -ENOTCONN;
1054                 goto exit;
1055         }
1056
1057         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1058 restart:
1059
1060         /* Look for a message in receive queue; wait if necessary */
1061         res = tipc_wait_for_rcvmsg(sock, &timeo);
1062         if (res)
1063                 goto exit;
1064
1065         /* Look at first message in receive queue */
1066         buf = skb_peek(&sk->sk_receive_queue);
1067         msg = buf_msg(buf);
1068         sz = msg_data_sz(msg);
1069         err = msg_errcode(msg);
1070
1071         /* Discard an empty non-errored message & try again */
1072         if ((!sz) && (!err)) {
1073                 advance_rx_queue(sk);
1074                 goto restart;
1075         }
1076
1077         /* Capture sender's address (optional) */
1078         set_orig_addr(m, msg);
1079
1080         /* Capture ancillary data (optional) */
1081         res = anc_data_recv(m, msg, port);
1082         if (res)
1083                 goto exit;
1084
1085         /* Capture message data (if valid) & compute return value (always) */
1086         if (!err) {
1087                 if (unlikely(buf_len < sz)) {
1088                         sz = buf_len;
1089                         m->msg_flags |= MSG_TRUNC;
1090                 }
1091                 res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
1092                                               m->msg_iov, sz);
1093                 if (res)
1094                         goto exit;
1095                 res = sz;
1096         } else {
1097                 if ((sock->state == SS_READY) ||
1098                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1099                         res = 0;
1100                 else
1101                         res = -ECONNRESET;
1102         }
1103
1104         /* Consume received message (optional) */
1105         if (likely(!(flags & MSG_PEEK))) {
1106                 if ((sock->state != SS_READY) &&
1107                     (++port->conn_unacked >= TIPC_CONNACK_INTV))
1108                         tipc_acknowledge(port->ref, port->conn_unacked);
1109                 advance_rx_queue(sk);
1110         }
1111 exit:
1112         release_sock(sk);
1113         return res;
1114 }
1115
1116 /**
1117  * tipc_recv_stream - receive stream-oriented data
1118  * @iocb: (unused)
1119  * @m: descriptor for message info
1120  * @buf_len: total size of user buffer area
1121  * @flags: receive flags
1122  *
1123  * Used for SOCK_STREAM messages only.  If not enough data is available
1124  * will optionally wait for more; never truncates data.
1125  *
1126  * Returns size of returned message data, errno otherwise
1127  */
1128 static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
1129                             struct msghdr *m, size_t buf_len, int flags)
1130 {
1131         struct sock *sk = sock->sk;
1132         struct tipc_sock *tsk = tipc_sk(sk);
1133         struct tipc_port *port = &tsk->port;
1134         struct sk_buff *buf;
1135         struct tipc_msg *msg;
1136         long timeo;
1137         unsigned int sz;
1138         int sz_to_copy, target, needed;
1139         int sz_copied = 0;
1140         u32 err;
1141         int res = 0;
1142
1143         /* Catch invalid receive attempts */
1144         if (unlikely(!buf_len))
1145                 return -EINVAL;
1146
1147         lock_sock(sk);
1148
1149         if (unlikely(sock->state == SS_UNCONNECTED)) {
1150                 res = -ENOTCONN;
1151                 goto exit;
1152         }
1153
1154         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1155         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1156
1157 restart:
1158         /* Look for a message in receive queue; wait if necessary */
1159         res = tipc_wait_for_rcvmsg(sock, &timeo);
1160         if (res)
1161                 goto exit;
1162
1163         /* Look at first message in receive queue */
1164         buf = skb_peek(&sk->sk_receive_queue);
1165         msg = buf_msg(buf);
1166         sz = msg_data_sz(msg);
1167         err = msg_errcode(msg);
1168
1169         /* Discard an empty non-errored message & try again */
1170         if ((!sz) && (!err)) {
1171                 advance_rx_queue(sk);
1172                 goto restart;
1173         }
1174
1175         /* Optionally capture sender's address & ancillary data of first msg */
1176         if (sz_copied == 0) {
1177                 set_orig_addr(m, msg);
1178                 res = anc_data_recv(m, msg, port);
1179                 if (res)
1180                         goto exit;
1181         }
1182
1183         /* Capture message data (if valid) & compute return value (always) */
1184         if (!err) {
1185                 u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1186
1187                 sz -= offset;
1188                 needed = (buf_len - sz_copied);
1189                 sz_to_copy = (sz <= needed) ? sz : needed;
1190
1191                 res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
1192                                               m->msg_iov, sz_to_copy);
1193                 if (res)
1194                         goto exit;
1195
1196                 sz_copied += sz_to_copy;
1197
1198                 if (sz_to_copy < sz) {
1199                         if (!(flags & MSG_PEEK))
1200                                 TIPC_SKB_CB(buf)->handle =
1201                                 (void *)(unsigned long)(offset + sz_to_copy);
1202                         goto exit;
1203                 }
1204         } else {
1205                 if (sz_copied != 0)
1206                         goto exit; /* can't add error msg to valid data */
1207
1208                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1209                         res = 0;
1210                 else
1211                         res = -ECONNRESET;
1212         }
1213
1214         /* Consume received message (optional) */
1215         if (likely(!(flags & MSG_PEEK))) {
1216                 if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV))
1217                         tipc_acknowledge(port->ref, port->conn_unacked);
1218                 advance_rx_queue(sk);
1219         }
1220
1221         /* Loop around if more data is required */
1222         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1223             (!skb_queue_empty(&sk->sk_receive_queue) ||
1224             (sz_copied < target)) &&    /* and more is ready or required */
1225             (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1226             (!err))                     /* and haven't reached a FIN */
1227                 goto restart;
1228
1229 exit:
1230         release_sock(sk);
1231         return sz_copied ? sz_copied : res;
1232 }
1233
1234 /**
1235  * tipc_write_space - wake up thread if port congestion is released
1236  * @sk: socket
1237  */
1238 static void tipc_write_space(struct sock *sk)
1239 {
1240         struct socket_wq *wq;
1241
1242         rcu_read_lock();
1243         wq = rcu_dereference(sk->sk_wq);
1244         if (wq_has_sleeper(wq))
1245                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1246                                                 POLLWRNORM | POLLWRBAND);
1247         rcu_read_unlock();
1248 }
1249
1250 /**
1251  * tipc_data_ready - wake up threads to indicate messages have been received
1252  * @sk: socket
1253  * @len: the length of messages
1254  */
1255 static void tipc_data_ready(struct sock *sk)
1256 {
1257         struct socket_wq *wq;
1258
1259         rcu_read_lock();
1260         wq = rcu_dereference(sk->sk_wq);
1261         if (wq_has_sleeper(wq))
1262                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1263                                                 POLLRDNORM | POLLRDBAND);
1264         rcu_read_unlock();
1265 }
1266
1267 /**
1268  * filter_connect - Handle all incoming messages for a connection-based socket
1269  * @tsk: TIPC socket
1270  * @msg: message
1271  *
1272  * Returns TIPC error status code and socket error status code
1273  * once it encounters some errors
1274  */
1275 static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1276 {
1277         struct sock *sk = &tsk->sk;
1278         struct tipc_port *port = &tsk->port;
1279         struct socket *sock = sk->sk_socket;
1280         struct tipc_msg *msg = buf_msg(*buf);
1281
1282         u32 retval = TIPC_ERR_NO_PORT;
1283         int res;
1284
1285         if (msg_mcast(msg))
1286                 return retval;
1287
1288         switch ((int)sock->state) {
1289         case SS_CONNECTED:
1290                 /* Accept only connection-based messages sent by peer */
1291                 if (msg_connected(msg) && tipc_port_peer_msg(port, msg)) {
1292                         if (unlikely(msg_errcode(msg))) {
1293                                 sock->state = SS_DISCONNECTING;
1294                                 __tipc_port_disconnect(port);
1295                         }
1296                         retval = TIPC_OK;
1297                 }
1298                 break;
1299         case SS_CONNECTING:
1300                 /* Accept only ACK or NACK message */
1301                 if (unlikely(msg_errcode(msg))) {
1302                         sock->state = SS_DISCONNECTING;
1303                         sk->sk_err = ECONNREFUSED;
1304                         retval = TIPC_OK;
1305                         break;
1306                 }
1307
1308                 if (unlikely(!msg_connected(msg)))
1309                         break;
1310
1311                 res = auto_connect(tsk, msg);
1312                 if (res) {
1313                         sock->state = SS_DISCONNECTING;
1314                         sk->sk_err = -res;
1315                         retval = TIPC_OK;
1316                         break;
1317                 }
1318
1319                 /* If an incoming message is an 'ACK-', it should be
1320                  * discarded here because it doesn't contain useful
1321                  * data. In addition, we should try to wake up
1322                  * connect() routine if sleeping.
1323                  */
1324                 if (msg_data_sz(msg) == 0) {
1325                         kfree_skb(*buf);
1326                         *buf = NULL;
1327                         if (waitqueue_active(sk_sleep(sk)))
1328                                 wake_up_interruptible(sk_sleep(sk));
1329                 }
1330                 retval = TIPC_OK;
1331                 break;
1332         case SS_LISTENING:
1333         case SS_UNCONNECTED:
1334                 /* Accept only SYN message */
1335                 if (!msg_connected(msg) && !(msg_errcode(msg)))
1336                         retval = TIPC_OK;
1337                 break;
1338         case SS_DISCONNECTING:
1339                 break;
1340         default:
1341                 pr_err("Unknown socket state %u\n", sock->state);
1342         }
1343         return retval;
1344 }
1345
1346 /**
1347  * rcvbuf_limit - get proper overload limit of socket receive queue
1348  * @sk: socket
1349  * @buf: message
1350  *
1351  * For all connection oriented messages, irrespective of importance,
1352  * the default overload value (i.e. 67MB) is set as limit.
1353  *
1354  * For all connectionless messages, by default new queue limits are
1355  * as belows:
1356  *
1357  * TIPC_LOW_IMPORTANCE       (4 MB)
1358  * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1359  * TIPC_HIGH_IMPORTANCE      (16 MB)
1360  * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1361  *
1362  * Returns overload limit according to corresponding message importance
1363  */
1364 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1365 {
1366         struct tipc_msg *msg = buf_msg(buf);
1367
1368         if (msg_connected(msg))
1369                 return sysctl_tipc_rmem[2];
1370
1371         return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1372                 msg_importance(msg);
1373 }
1374
1375 /**
1376  * filter_rcv - validate incoming message
1377  * @sk: socket
1378  * @buf: message
1379  *
1380  * Enqueues message on receive queue if acceptable; optionally handles
1381  * disconnect indication for a connected socket.
1382  *
1383  * Called with socket lock already taken; port lock may also be taken.
1384  *
1385  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1386  */
1387 static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1388 {
1389         struct socket *sock = sk->sk_socket;
1390         struct tipc_sock *tsk = tipc_sk(sk);
1391         struct tipc_msg *msg = buf_msg(buf);
1392         unsigned int limit = rcvbuf_limit(sk, buf);
1393         u32 res = TIPC_OK;
1394
1395         /* Reject message if it is wrong sort of message for socket */
1396         if (msg_type(msg) > TIPC_DIRECT_MSG)
1397                 return TIPC_ERR_NO_PORT;
1398
1399         if (sock->state == SS_READY) {
1400                 if (msg_connected(msg))
1401                         return TIPC_ERR_NO_PORT;
1402         } else {
1403                 res = filter_connect(tsk, &buf);
1404                 if (res != TIPC_OK || buf == NULL)
1405                         return res;
1406         }
1407
1408         /* Reject message if there isn't room to queue it */
1409         if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1410                 return TIPC_ERR_OVERLOAD;
1411
1412         /* Enqueue message */
1413         TIPC_SKB_CB(buf)->handle = NULL;
1414         __skb_queue_tail(&sk->sk_receive_queue, buf);
1415         skb_set_owner_r(buf, sk);
1416
1417         sk->sk_data_ready(sk);
1418         return TIPC_OK;
1419 }
1420
1421 /**
1422  * tipc_backlog_rcv - handle incoming message from backlog queue
1423  * @sk: socket
1424  * @buf: message
1425  *
1426  * Caller must hold socket lock, but not port lock.
1427  *
1428  * Returns 0
1429  */
1430 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
1431 {
1432         u32 res;
1433         struct tipc_sock *tsk = tipc_sk(sk);
1434         uint truesize = buf->truesize;
1435
1436         res = filter_rcv(sk, buf);
1437         if (unlikely(res))
1438                 tipc_reject_msg(buf, res);
1439
1440         if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1441                 atomic_add(truesize, &tsk->dupl_rcvcnt);
1442
1443         return 0;
1444 }
1445
1446 /**
1447  * tipc_sk_rcv - handle incoming message
1448  * @buf: buffer containing arriving message
1449  * Consumes buffer
1450  * Returns 0 if success, or errno: -EHOSTUNREACH
1451  */
1452 int tipc_sk_rcv(struct sk_buff *buf)
1453 {
1454         struct tipc_sock *tsk;
1455         struct tipc_port *port;
1456         struct sock *sk;
1457         u32 dport = msg_destport(buf_msg(buf));
1458         int err = TIPC_OK;
1459         uint limit;
1460
1461         /* Forward unresolved named message */
1462         if (unlikely(!dport)) {
1463                 tipc_net_route_msg(buf);
1464                 return 0;
1465         }
1466
1467         /* Validate destination */
1468         port = tipc_port_lock(dport);
1469         if (unlikely(!port)) {
1470                 err = TIPC_ERR_NO_PORT;
1471                 goto exit;
1472         }
1473
1474         tsk = tipc_port_to_sock(port);
1475         sk = &tsk->sk;
1476
1477         /* Queue message */
1478         bh_lock_sock(sk);
1479
1480         if (!sock_owned_by_user(sk)) {
1481                 err = filter_rcv(sk, buf);
1482         } else {
1483                 if (sk->sk_backlog.len == 0)
1484                         atomic_set(&tsk->dupl_rcvcnt, 0);
1485                 limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt);
1486                 if (sk_add_backlog(sk, buf, limit))
1487                         err = TIPC_ERR_OVERLOAD;
1488         }
1489
1490         bh_unlock_sock(sk);
1491         tipc_port_unlock(port);
1492
1493         if (likely(!err))
1494                 return 0;
1495 exit:
1496         tipc_reject_msg(buf, err);
1497         return -EHOSTUNREACH;
1498 }
1499
1500 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1501 {
1502         struct sock *sk = sock->sk;
1503         DEFINE_WAIT(wait);
1504         int done;
1505
1506         do {
1507                 int err = sock_error(sk);
1508                 if (err)
1509                         return err;
1510                 if (!*timeo_p)
1511                         return -ETIMEDOUT;
1512                 if (signal_pending(current))
1513                         return sock_intr_errno(*timeo_p);
1514
1515                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1516                 done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1517                 finish_wait(sk_sleep(sk), &wait);
1518         } while (!done);
1519         return 0;
1520 }
1521
1522 /**
1523  * tipc_connect - establish a connection to another TIPC port
1524  * @sock: socket structure
1525  * @dest: socket address for destination port
1526  * @destlen: size of socket address data structure
1527  * @flags: file-related flags associated with socket
1528  *
1529  * Returns 0 on success, errno otherwise
1530  */
1531 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1532                         int destlen, int flags)
1533 {
1534         struct sock *sk = sock->sk;
1535         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1536         struct msghdr m = {NULL,};
1537         long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1538         socket_state previous;
1539         int res;
1540
1541         lock_sock(sk);
1542
1543         /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1544         if (sock->state == SS_READY) {
1545                 res = -EOPNOTSUPP;
1546                 goto exit;
1547         }
1548
1549         /*
1550          * Reject connection attempt using multicast address
1551          *
1552          * Note: send_msg() validates the rest of the address fields,
1553          *       so there's no need to do it here
1554          */
1555         if (dst->addrtype == TIPC_ADDR_MCAST) {
1556                 res = -EINVAL;
1557                 goto exit;
1558         }
1559
1560         previous = sock->state;
1561         switch (sock->state) {
1562         case SS_UNCONNECTED:
1563                 /* Send a 'SYN-' to destination */
1564                 m.msg_name = dest;
1565                 m.msg_namelen = destlen;
1566
1567                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1568                  * indicate send_msg() is never blocked.
1569                  */
1570                 if (!timeout)
1571                         m.msg_flags = MSG_DONTWAIT;
1572
1573                 res = tipc_sendmsg(NULL, sock, &m, 0);
1574                 if ((res < 0) && (res != -EWOULDBLOCK))
1575                         goto exit;
1576
1577                 /* Just entered SS_CONNECTING state; the only
1578                  * difference is that return value in non-blocking
1579                  * case is EINPROGRESS, rather than EALREADY.
1580                  */
1581                 res = -EINPROGRESS;
1582         case SS_CONNECTING:
1583                 if (previous == SS_CONNECTING)
1584                         res = -EALREADY;
1585                 if (!timeout)
1586                         goto exit;
1587                 timeout = msecs_to_jiffies(timeout);
1588                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1589                 res = tipc_wait_for_connect(sock, &timeout);
1590                 break;
1591         case SS_CONNECTED:
1592                 res = -EISCONN;
1593                 break;
1594         default:
1595                 res = -EINVAL;
1596                 break;
1597         }
1598 exit:
1599         release_sock(sk);
1600         return res;
1601 }
1602
1603 /**
1604  * tipc_listen - allow socket to listen for incoming connections
1605  * @sock: socket structure
1606  * @len: (unused)
1607  *
1608  * Returns 0 on success, errno otherwise
1609  */
1610 static int tipc_listen(struct socket *sock, int len)
1611 {
1612         struct sock *sk = sock->sk;
1613         int res;
1614
1615         lock_sock(sk);
1616
1617         if (sock->state != SS_UNCONNECTED)
1618                 res = -EINVAL;
1619         else {
1620                 sock->state = SS_LISTENING;
1621                 res = 0;
1622         }
1623
1624         release_sock(sk);
1625         return res;
1626 }
1627
1628 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1629 {
1630         struct sock *sk = sock->sk;
1631         DEFINE_WAIT(wait);
1632         int err;
1633
1634         /* True wake-one mechanism for incoming connections: only
1635          * one process gets woken up, not the 'whole herd'.
1636          * Since we do not 'race & poll' for established sockets
1637          * anymore, the common case will execute the loop only once.
1638         */
1639         for (;;) {
1640                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1641                                           TASK_INTERRUPTIBLE);
1642                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1643                         release_sock(sk);
1644                         timeo = schedule_timeout(timeo);
1645                         lock_sock(sk);
1646                 }
1647                 err = 0;
1648                 if (!skb_queue_empty(&sk->sk_receive_queue))
1649                         break;
1650                 err = -EINVAL;
1651                 if (sock->state != SS_LISTENING)
1652                         break;
1653                 err = sock_intr_errno(timeo);
1654                 if (signal_pending(current))
1655                         break;
1656                 err = -EAGAIN;
1657                 if (!timeo)
1658                         break;
1659         }
1660         finish_wait(sk_sleep(sk), &wait);
1661         return err;
1662 }
1663
1664 /**
1665  * tipc_accept - wait for connection request
1666  * @sock: listening socket
1667  * @newsock: new socket that is to be connected
1668  * @flags: file-related flags associated with socket
1669  *
1670  * Returns 0 on success, errno otherwise
1671  */
1672 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
1673 {
1674         struct sock *new_sk, *sk = sock->sk;
1675         struct sk_buff *buf;
1676         struct tipc_port *new_port;
1677         struct tipc_msg *msg;
1678         struct tipc_portid peer;
1679         u32 new_ref;
1680         long timeo;
1681         int res;
1682
1683         lock_sock(sk);
1684
1685         if (sock->state != SS_LISTENING) {
1686                 res = -EINVAL;
1687                 goto exit;
1688         }
1689         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
1690         res = tipc_wait_for_accept(sock, timeo);
1691         if (res)
1692                 goto exit;
1693
1694         buf = skb_peek(&sk->sk_receive_queue);
1695
1696         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
1697         if (res)
1698                 goto exit;
1699
1700         new_sk = new_sock->sk;
1701         new_port = &tipc_sk(new_sk)->port;
1702         new_ref = new_port->ref;
1703         msg = buf_msg(buf);
1704
1705         /* we lock on new_sk; but lockdep sees the lock on sk */
1706         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
1707
1708         /*
1709          * Reject any stray messages received by new socket
1710          * before the socket lock was taken (very, very unlikely)
1711          */
1712         reject_rx_queue(new_sk);
1713
1714         /* Connect new socket to it's peer */
1715         peer.ref = msg_origport(msg);
1716         peer.node = msg_orignode(msg);
1717         tipc_port_connect(new_ref, &peer);
1718         new_sock->state = SS_CONNECTED;
1719
1720         tipc_port_set_importance(new_port, msg_importance(msg));
1721         if (msg_named(msg)) {
1722                 new_port->conn_type = msg_nametype(msg);
1723                 new_port->conn_instance = msg_nameinst(msg);
1724         }
1725
1726         /*
1727          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1728          * Respond to 'SYN+' by queuing it on new socket.
1729          */
1730         if (!msg_data_sz(msg)) {
1731                 struct msghdr m = {NULL,};
1732
1733                 advance_rx_queue(sk);
1734                 tipc_send_packet(NULL, new_sock, &m, 0);
1735         } else {
1736                 __skb_dequeue(&sk->sk_receive_queue);
1737                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
1738                 skb_set_owner_r(buf, new_sk);
1739         }
1740         release_sock(new_sk);
1741 exit:
1742         release_sock(sk);
1743         return res;
1744 }
1745
1746 /**
1747  * tipc_shutdown - shutdown socket connection
1748  * @sock: socket structure
1749  * @how: direction to close (must be SHUT_RDWR)
1750  *
1751  * Terminates connection (if necessary), then purges socket's receive queue.
1752  *
1753  * Returns 0 on success, errno otherwise
1754  */
1755 static int tipc_shutdown(struct socket *sock, int how)
1756 {
1757         struct sock *sk = sock->sk;
1758         struct tipc_sock *tsk = tipc_sk(sk);
1759         struct tipc_port *port = &tsk->port;
1760         struct sk_buff *buf;
1761         int res;
1762
1763         if (how != SHUT_RDWR)
1764                 return -EINVAL;
1765
1766         lock_sock(sk);
1767
1768         switch (sock->state) {
1769         case SS_CONNECTING:
1770         case SS_CONNECTED:
1771
1772 restart:
1773                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1774                 buf = __skb_dequeue(&sk->sk_receive_queue);
1775                 if (buf) {
1776                         if (TIPC_SKB_CB(buf)->handle != NULL) {
1777                                 kfree_skb(buf);
1778                                 goto restart;
1779                         }
1780                         tipc_port_disconnect(port->ref);
1781                         tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
1782                 } else {
1783                         tipc_port_shutdown(port->ref);
1784                 }
1785
1786                 sock->state = SS_DISCONNECTING;
1787
1788                 /* fall through */
1789
1790         case SS_DISCONNECTING:
1791
1792                 /* Discard any unreceived messages */
1793                 __skb_queue_purge(&sk->sk_receive_queue);
1794
1795                 /* Wake up anyone sleeping in poll */
1796                 sk->sk_state_change(sk);
1797                 res = 0;
1798                 break;
1799
1800         default:
1801                 res = -ENOTCONN;
1802         }
1803
1804         release_sock(sk);
1805         return res;
1806 }
1807
1808 /**
1809  * tipc_setsockopt - set socket option
1810  * @sock: socket structure
1811  * @lvl: option level
1812  * @opt: option identifier
1813  * @ov: pointer to new option value
1814  * @ol: length of option value
1815  *
1816  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
1817  * (to ease compatibility).
1818  *
1819  * Returns 0 on success, errno otherwise
1820  */
1821 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
1822                            char __user *ov, unsigned int ol)
1823 {
1824         struct sock *sk = sock->sk;
1825         struct tipc_sock *tsk = tipc_sk(sk);
1826         struct tipc_port *port = &tsk->port;
1827         u32 value;
1828         int res;
1829
1830         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1831                 return 0;
1832         if (lvl != SOL_TIPC)
1833                 return -ENOPROTOOPT;
1834         if (ol < sizeof(value))
1835                 return -EINVAL;
1836         res = get_user(value, (u32 __user *)ov);
1837         if (res)
1838                 return res;
1839
1840         lock_sock(sk);
1841
1842         switch (opt) {
1843         case TIPC_IMPORTANCE:
1844                 tipc_port_set_importance(port, value);
1845                 break;
1846         case TIPC_SRC_DROPPABLE:
1847                 if (sock->type != SOCK_STREAM)
1848                         tipc_port_set_unreliable(port, value);
1849                 else
1850                         res = -ENOPROTOOPT;
1851                 break;
1852         case TIPC_DEST_DROPPABLE:
1853                 tipc_port_set_unreturnable(port, value);
1854                 break;
1855         case TIPC_CONN_TIMEOUT:
1856                 tipc_sk(sk)->conn_timeout = value;
1857                 /* no need to set "res", since already 0 at this point */
1858                 break;
1859         default:
1860                 res = -EINVAL;
1861         }
1862
1863         release_sock(sk);
1864
1865         return res;
1866 }
1867
1868 /**
1869  * tipc_getsockopt - get socket option
1870  * @sock: socket structure
1871  * @lvl: option level
1872  * @opt: option identifier
1873  * @ov: receptacle for option value
1874  * @ol: receptacle for length of option value
1875  *
1876  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
1877  * (to ease compatibility).
1878  *
1879  * Returns 0 on success, errno otherwise
1880  */
1881 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
1882                            char __user *ov, int __user *ol)
1883 {
1884         struct sock *sk = sock->sk;
1885         struct tipc_sock *tsk = tipc_sk(sk);
1886         struct tipc_port *port = &tsk->port;
1887         int len;
1888         u32 value;
1889         int res;
1890
1891         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1892                 return put_user(0, ol);
1893         if (lvl != SOL_TIPC)
1894                 return -ENOPROTOOPT;
1895         res = get_user(len, ol);
1896         if (res)
1897                 return res;
1898
1899         lock_sock(sk);
1900
1901         switch (opt) {
1902         case TIPC_IMPORTANCE:
1903                 value = tipc_port_importance(port);
1904                 break;
1905         case TIPC_SRC_DROPPABLE:
1906                 value = tipc_port_unreliable(port);
1907                 break;
1908         case TIPC_DEST_DROPPABLE:
1909                 value = tipc_port_unreturnable(port);
1910                 break;
1911         case TIPC_CONN_TIMEOUT:
1912                 value = tipc_sk(sk)->conn_timeout;
1913                 /* no need to set "res", since already 0 at this point */
1914                 break;
1915         case TIPC_NODE_RECVQ_DEPTH:
1916                 value = 0; /* was tipc_queue_size, now obsolete */
1917                 break;
1918         case TIPC_SOCK_RECVQ_DEPTH:
1919                 value = skb_queue_len(&sk->sk_receive_queue);
1920                 break;
1921         default:
1922                 res = -EINVAL;
1923         }
1924
1925         release_sock(sk);
1926
1927         if (res)
1928                 return res;     /* "get" failed */
1929
1930         if (len < sizeof(value))
1931                 return -EINVAL;
1932
1933         if (copy_to_user(ov, &value, sizeof(value)))
1934                 return -EFAULT;
1935
1936         return put_user(sizeof(value), ol);
1937 }
1938
1939 int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg)
1940 {
1941         struct tipc_sioc_ln_req lnr;
1942         void __user *argp = (void __user *)arg;
1943
1944         switch (cmd) {
1945         case SIOCGETLINKNAME:
1946                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
1947                         return -EFAULT;
1948                 if (!tipc_node_get_linkname(lnr.bearer_id, lnr.peer,
1949                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
1950                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
1951                                 return -EFAULT;
1952                         return 0;
1953                 }
1954                 return -EADDRNOTAVAIL;
1955                 break;
1956         default:
1957                 return -ENOIOCTLCMD;
1958         }
1959 }
1960
1961 /* Protocol switches for the various types of TIPC sockets */
1962
1963 static const struct proto_ops msg_ops = {
1964         .owner          = THIS_MODULE,
1965         .family         = AF_TIPC,
1966         .release        = tipc_release,
1967         .bind           = tipc_bind,
1968         .connect        = tipc_connect,
1969         .socketpair     = sock_no_socketpair,
1970         .accept         = sock_no_accept,
1971         .getname        = tipc_getname,
1972         .poll           = tipc_poll,
1973         .ioctl          = tipc_ioctl,
1974         .listen         = sock_no_listen,
1975         .shutdown       = tipc_shutdown,
1976         .setsockopt     = tipc_setsockopt,
1977         .getsockopt     = tipc_getsockopt,
1978         .sendmsg        = tipc_sendmsg,
1979         .recvmsg        = tipc_recvmsg,
1980         .mmap           = sock_no_mmap,
1981         .sendpage       = sock_no_sendpage
1982 };
1983
1984 static const struct proto_ops packet_ops = {
1985         .owner          = THIS_MODULE,
1986         .family         = AF_TIPC,
1987         .release        = tipc_release,
1988         .bind           = tipc_bind,
1989         .connect        = tipc_connect,
1990         .socketpair     = sock_no_socketpair,
1991         .accept         = tipc_accept,
1992         .getname        = tipc_getname,
1993         .poll           = tipc_poll,
1994         .ioctl          = tipc_ioctl,
1995         .listen         = tipc_listen,
1996         .shutdown       = tipc_shutdown,
1997         .setsockopt     = tipc_setsockopt,
1998         .getsockopt     = tipc_getsockopt,
1999         .sendmsg        = tipc_send_packet,
2000         .recvmsg        = tipc_recvmsg,
2001         .mmap           = sock_no_mmap,
2002         .sendpage       = sock_no_sendpage
2003 };
2004
2005 static const struct proto_ops stream_ops = {
2006         .owner          = THIS_MODULE,
2007         .family         = AF_TIPC,
2008         .release        = tipc_release,
2009         .bind           = tipc_bind,
2010         .connect        = tipc_connect,
2011         .socketpair     = sock_no_socketpair,
2012         .accept         = tipc_accept,
2013         .getname        = tipc_getname,
2014         .poll           = tipc_poll,
2015         .ioctl          = tipc_ioctl,
2016         .listen         = tipc_listen,
2017         .shutdown       = tipc_shutdown,
2018         .setsockopt     = tipc_setsockopt,
2019         .getsockopt     = tipc_getsockopt,
2020         .sendmsg        = tipc_send_stream,
2021         .recvmsg        = tipc_recv_stream,
2022         .mmap           = sock_no_mmap,
2023         .sendpage       = sock_no_sendpage
2024 };
2025
2026 static const struct net_proto_family tipc_family_ops = {
2027         .owner          = THIS_MODULE,
2028         .family         = AF_TIPC,
2029         .create         = tipc_sk_create
2030 };
2031
2032 static struct proto tipc_proto = {
2033         .name           = "TIPC",
2034         .owner          = THIS_MODULE,
2035         .obj_size       = sizeof(struct tipc_sock),
2036         .sysctl_rmem    = sysctl_tipc_rmem
2037 };
2038
2039 static struct proto tipc_proto_kern = {
2040         .name           = "TIPC",
2041         .obj_size       = sizeof(struct tipc_sock),
2042         .sysctl_rmem    = sysctl_tipc_rmem
2043 };
2044
2045 /**
2046  * tipc_socket_init - initialize TIPC socket interface
2047  *
2048  * Returns 0 on success, errno otherwise
2049  */
2050 int tipc_socket_init(void)
2051 {
2052         int res;
2053
2054         res = proto_register(&tipc_proto, 1);
2055         if (res) {
2056                 pr_err("Failed to register TIPC protocol type\n");
2057                 goto out;
2058         }
2059
2060         res = sock_register(&tipc_family_ops);
2061         if (res) {
2062                 pr_err("Failed to register TIPC socket type\n");
2063                 proto_unregister(&tipc_proto);
2064                 goto out;
2065         }
2066  out:
2067         return res;
2068 }
2069
2070 /**
2071  * tipc_socket_stop - stop TIPC socket interface
2072  */
2073 void tipc_socket_stop(void)
2074 {
2075         sock_unregister(tipc_family_ops.family);
2076         proto_unregister(&tipc_proto);
2077 }