]> git.karo-electronics.de Git - karo-tx-linux.git/blob - net/tipc/link.c
Merge remote-tracking branch 'hwmon-staging/hwmon-next'
[karo-tx-linux.git] / net / tipc / link.c
1 /*
2  * net/tipc/link.c: TIPC link code
3  *
4  * Copyright (c) 1996-2007, 2012, Ericsson AB
5  * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "link.h"
39 #include "port.h"
40 #include "name_distr.h"
41 #include "discover.h"
42 #include "config.h"
43
44 #include <linux/pkt_sched.h>
45
46 /*
47  * Error message prefixes
48  */
49 static const char *link_co_err = "Link changeover error, ";
50 static const char *link_rst_msg = "Resetting link ";
51 static const char *link_unk_evt = "Unknown link event ";
52
53 /*
54  * Out-of-range value for link session numbers
55  */
56 #define INVALID_SESSION 0x10000
57
58 /*
59  * Link state events:
60  */
61 #define  STARTING_EVT    856384768      /* link processing trigger */
62 #define  TRAFFIC_MSG_EVT 560815u        /* rx'd ??? */
63 #define  TIMEOUT_EVT     560817u        /* link timer expired */
64
65 /*
66  * The following two 'message types' is really just implementation
67  * data conveniently stored in the message header.
68  * They must not be considered part of the protocol
69  */
70 #define OPEN_MSG   0
71 #define CLOSED_MSG 1
72
73 /*
74  * State value stored in 'exp_msg_count'
75  */
76 #define START_CHANGEOVER 100000u
77
78 /**
79  * struct tipc_link_name - deconstructed link name
80  * @addr_local: network address of node at this end
81  * @if_local: name of interface at this end
82  * @addr_peer: network address of node at far end
83  * @if_peer: name of interface at far end
84  */
85 struct tipc_link_name {
86         u32 addr_local;
87         char if_local[TIPC_MAX_IF_NAME];
88         u32 addr_peer;
89         char if_peer[TIPC_MAX_IF_NAME];
90 };
91
92 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
93                                        struct sk_buff *buf);
94 static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf);
95 static int  link_recv_changeover_msg(struct tipc_link **l_ptr,
96                                      struct sk_buff **buf);
97 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
98 static int  link_send_sections_long(struct tipc_port *sender,
99                                     struct iovec const *msg_sect,
100                                     u32 num_sect, unsigned int total_len,
101                                     u32 destnode);
102 static void link_state_event(struct tipc_link *l_ptr, u32 event);
103 static void link_reset_statistics(struct tipc_link *l_ptr);
104 static void link_print(struct tipc_link *l_ptr, const char *str);
105 static void link_start(struct tipc_link *l_ptr);
106 static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
107 static void tipc_link_send_sync(struct tipc_link *l);
108 static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf);
109
110 /*
111  *  Simple link routines
112  */
113 static unsigned int align(unsigned int i)
114 {
115         return (i + 3) & ~3u;
116 }
117
118 static void link_init_max_pkt(struct tipc_link *l_ptr)
119 {
120         u32 max_pkt;
121
122         max_pkt = (l_ptr->b_ptr->mtu & ~3);
123         if (max_pkt > MAX_MSG_SIZE)
124                 max_pkt = MAX_MSG_SIZE;
125
126         l_ptr->max_pkt_target = max_pkt;
127         if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
128                 l_ptr->max_pkt = l_ptr->max_pkt_target;
129         else
130                 l_ptr->max_pkt = MAX_PKT_DEFAULT;
131
132         l_ptr->max_pkt_probes = 0;
133 }
134
135 static u32 link_next_sent(struct tipc_link *l_ptr)
136 {
137         if (l_ptr->next_out)
138                 return buf_seqno(l_ptr->next_out);
139         return mod(l_ptr->next_out_no);
140 }
141
142 static u32 link_last_sent(struct tipc_link *l_ptr)
143 {
144         return mod(link_next_sent(l_ptr) - 1);
145 }
146
147 /*
148  *  Simple non-static link routines (i.e. referenced outside this file)
149  */
150 int tipc_link_is_up(struct tipc_link *l_ptr)
151 {
152         if (!l_ptr)
153                 return 0;
154         return link_working_working(l_ptr) || link_working_unknown(l_ptr);
155 }
156
157 int tipc_link_is_active(struct tipc_link *l_ptr)
158 {
159         return  (l_ptr->owner->active_links[0] == l_ptr) ||
160                 (l_ptr->owner->active_links[1] == l_ptr);
161 }
162
163 /**
164  * link_name_validate - validate & (optionally) deconstruct tipc_link name
165  * @name: ptr to link name string
166  * @name_parts: ptr to area for link name components (or NULL if not needed)
167  *
168  * Returns 1 if link name is valid, otherwise 0.
169  */
170 static int link_name_validate(const char *name,
171                                 struct tipc_link_name *name_parts)
172 {
173         char name_copy[TIPC_MAX_LINK_NAME];
174         char *addr_local;
175         char *if_local;
176         char *addr_peer;
177         char *if_peer;
178         char dummy;
179         u32 z_local, c_local, n_local;
180         u32 z_peer, c_peer, n_peer;
181         u32 if_local_len;
182         u32 if_peer_len;
183
184         /* copy link name & ensure length is OK */
185         name_copy[TIPC_MAX_LINK_NAME - 1] = 0;
186         /* need above in case non-Posix strncpy() doesn't pad with nulls */
187         strncpy(name_copy, name, TIPC_MAX_LINK_NAME);
188         if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0)
189                 return 0;
190
191         /* ensure all component parts of link name are present */
192         addr_local = name_copy;
193         if_local = strchr(addr_local, ':');
194         if (if_local == NULL)
195                 return 0;
196         *(if_local++) = 0;
197         addr_peer = strchr(if_local, '-');
198         if (addr_peer == NULL)
199                 return 0;
200         *(addr_peer++) = 0;
201         if_local_len = addr_peer - if_local;
202         if_peer = strchr(addr_peer, ':');
203         if (if_peer == NULL)
204                 return 0;
205         *(if_peer++) = 0;
206         if_peer_len = strlen(if_peer) + 1;
207
208         /* validate component parts of link name */
209         if ((sscanf(addr_local, "%u.%u.%u%c",
210                     &z_local, &c_local, &n_local, &dummy) != 3) ||
211             (sscanf(addr_peer, "%u.%u.%u%c",
212                     &z_peer, &c_peer, &n_peer, &dummy) != 3) ||
213             (z_local > 255) || (c_local > 4095) || (n_local > 4095) ||
214             (z_peer  > 255) || (c_peer  > 4095) || (n_peer  > 4095) ||
215             (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) ||
216             (if_peer_len  <= 1) || (if_peer_len  > TIPC_MAX_IF_NAME))
217                 return 0;
218
219         /* return link name components, if necessary */
220         if (name_parts) {
221                 name_parts->addr_local = tipc_addr(z_local, c_local, n_local);
222                 strcpy(name_parts->if_local, if_local);
223                 name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer);
224                 strcpy(name_parts->if_peer, if_peer);
225         }
226         return 1;
227 }
228
229 /**
230  * link_timeout - handle expiration of link timer
231  * @l_ptr: pointer to link
232  *
233  * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
234  * with tipc_link_delete().  (There is no risk that the node will be deleted by
235  * another thread because tipc_link_delete() always cancels the link timer before
236  * tipc_node_delete() is called.)
237  */
238 static void link_timeout(struct tipc_link *l_ptr)
239 {
240         tipc_node_lock(l_ptr->owner);
241
242         /* update counters used in statistical profiling of send traffic */
243         l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
244         l_ptr->stats.queue_sz_counts++;
245
246         if (l_ptr->first_out) {
247                 struct tipc_msg *msg = buf_msg(l_ptr->first_out);
248                 u32 length = msg_size(msg);
249
250                 if ((msg_user(msg) == MSG_FRAGMENTER) &&
251                     (msg_type(msg) == FIRST_FRAGMENT)) {
252                         length = msg_size(msg_get_wrapped(msg));
253                 }
254                 if (length) {
255                         l_ptr->stats.msg_lengths_total += length;
256                         l_ptr->stats.msg_length_counts++;
257                         if (length <= 64)
258                                 l_ptr->stats.msg_length_profile[0]++;
259                         else if (length <= 256)
260                                 l_ptr->stats.msg_length_profile[1]++;
261                         else if (length <= 1024)
262                                 l_ptr->stats.msg_length_profile[2]++;
263                         else if (length <= 4096)
264                                 l_ptr->stats.msg_length_profile[3]++;
265                         else if (length <= 16384)
266                                 l_ptr->stats.msg_length_profile[4]++;
267                         else if (length <= 32768)
268                                 l_ptr->stats.msg_length_profile[5]++;
269                         else
270                                 l_ptr->stats.msg_length_profile[6]++;
271                 }
272         }
273
274         /* do all other link processing performed on a periodic basis */
275
276         link_state_event(l_ptr, TIMEOUT_EVT);
277
278         if (l_ptr->next_out)
279                 tipc_link_push_queue(l_ptr);
280
281         tipc_node_unlock(l_ptr->owner);
282 }
283
284 static void link_set_timer(struct tipc_link *l_ptr, u32 time)
285 {
286         k_start_timer(&l_ptr->timer, time);
287 }
288
289 /**
290  * tipc_link_create - create a new link
291  * @n_ptr: pointer to associated node
292  * @b_ptr: pointer to associated bearer
293  * @media_addr: media address to use when sending messages over link
294  *
295  * Returns pointer to link.
296  */
297 struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
298                               struct tipc_bearer *b_ptr,
299                               const struct tipc_media_addr *media_addr)
300 {
301         struct tipc_link *l_ptr;
302         struct tipc_msg *msg;
303         char *if_name;
304         char addr_string[16];
305         u32 peer = n_ptr->addr;
306
307         if (n_ptr->link_cnt >= 2) {
308                 tipc_addr_string_fill(addr_string, n_ptr->addr);
309                 pr_err("Attempt to establish third link to %s\n", addr_string);
310                 return NULL;
311         }
312
313         if (n_ptr->links[b_ptr->identity]) {
314                 tipc_addr_string_fill(addr_string, n_ptr->addr);
315                 pr_err("Attempt to establish second link on <%s> to %s\n",
316                        b_ptr->name, addr_string);
317                 return NULL;
318         }
319
320         l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
321         if (!l_ptr) {
322                 pr_warn("Link creation failed, no memory\n");
323                 return NULL;
324         }
325
326         l_ptr->addr = peer;
327         if_name = strchr(b_ptr->name, ':') + 1;
328         sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
329                 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
330                 tipc_node(tipc_own_addr),
331                 if_name,
332                 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
333                 /* note: peer i/f name is updated by reset/activate message */
334         memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
335         l_ptr->owner = n_ptr;
336         l_ptr->checkpoint = 1;
337         l_ptr->peer_session = INVALID_SESSION;
338         l_ptr->b_ptr = b_ptr;
339         link_set_supervision_props(l_ptr, b_ptr->tolerance);
340         l_ptr->state = RESET_UNKNOWN;
341
342         l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
343         msg = l_ptr->pmsg;
344         tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr);
345         msg_set_size(msg, sizeof(l_ptr->proto_msg));
346         msg_set_session(msg, (tipc_random & 0xffff));
347         msg_set_bearer_id(msg, b_ptr->identity);
348         strcpy((char *)msg_data(msg), if_name);
349
350         l_ptr->priority = b_ptr->priority;
351         tipc_link_set_queue_limits(l_ptr, b_ptr->window);
352
353         link_init_max_pkt(l_ptr);
354
355         l_ptr->next_out_no = 1;
356         INIT_LIST_HEAD(&l_ptr->waiting_ports);
357
358         link_reset_statistics(l_ptr);
359
360         tipc_node_attach_link(n_ptr, l_ptr);
361
362         k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
363         list_add_tail(&l_ptr->link_list, &b_ptr->links);
364         tipc_k_signal((Handler)link_start, (unsigned long)l_ptr);
365
366         return l_ptr;
367 }
368
369 /**
370  * tipc_link_delete - delete a link
371  * @l_ptr: pointer to link
372  *
373  * Note: 'tipc_net_lock' is write_locked, bearer is locked.
374  * This routine must not grab the node lock until after link timer cancellation
375  * to avoid a potential deadlock situation.
376  */
377 void tipc_link_delete(struct tipc_link *l_ptr)
378 {
379         if (!l_ptr) {
380                 pr_err("Attempt to delete non-existent link\n");
381                 return;
382         }
383
384         k_cancel_timer(&l_ptr->timer);
385
386         tipc_node_lock(l_ptr->owner);
387         tipc_link_reset(l_ptr);
388         tipc_node_detach_link(l_ptr->owner, l_ptr);
389         tipc_link_stop(l_ptr);
390         list_del_init(&l_ptr->link_list);
391         tipc_node_unlock(l_ptr->owner);
392         k_term_timer(&l_ptr->timer);
393         kfree(l_ptr);
394 }
395
396 static void link_start(struct tipc_link *l_ptr)
397 {
398         tipc_node_lock(l_ptr->owner);
399         link_state_event(l_ptr, STARTING_EVT);
400         tipc_node_unlock(l_ptr->owner);
401 }
402
403 /**
404  * link_schedule_port - schedule port for deferred sending
405  * @l_ptr: pointer to link
406  * @origport: reference to sending port
407  * @sz: amount of data to be sent
408  *
409  * Schedules port for renewed sending of messages after link congestion
410  * has abated.
411  */
412 static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
413 {
414         struct tipc_port *p_ptr;
415
416         spin_lock_bh(&tipc_port_list_lock);
417         p_ptr = tipc_port_lock(origport);
418         if (p_ptr) {
419                 if (!p_ptr->wakeup)
420                         goto exit;
421                 if (!list_empty(&p_ptr->wait_list))
422                         goto exit;
423                 p_ptr->congested = 1;
424                 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
425                 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
426                 l_ptr->stats.link_congs++;
427 exit:
428                 tipc_port_unlock(p_ptr);
429         }
430         spin_unlock_bh(&tipc_port_list_lock);
431         return -ELINKCONG;
432 }
433
434 void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
435 {
436         struct tipc_port *p_ptr;
437         struct tipc_port *temp_p_ptr;
438         int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
439
440         if (all)
441                 win = 100000;
442         if (win <= 0)
443                 return;
444         if (!spin_trylock_bh(&tipc_port_list_lock))
445                 return;
446         if (link_congested(l_ptr))
447                 goto exit;
448         list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
449                                  wait_list) {
450                 if (win <= 0)
451                         break;
452                 list_del_init(&p_ptr->wait_list);
453                 spin_lock_bh(p_ptr->lock);
454                 p_ptr->congested = 0;
455                 p_ptr->wakeup(p_ptr);
456                 win -= p_ptr->waiting_pkts;
457                 spin_unlock_bh(p_ptr->lock);
458         }
459
460 exit:
461         spin_unlock_bh(&tipc_port_list_lock);
462 }
463
464 /**
465  * link_release_outqueue - purge link's outbound message queue
466  * @l_ptr: pointer to link
467  */
468 static void link_release_outqueue(struct tipc_link *l_ptr)
469 {
470         struct sk_buff *buf = l_ptr->first_out;
471         struct sk_buff *next;
472
473         while (buf) {
474                 next = buf->next;
475                 kfree_skb(buf);
476                 buf = next;
477         }
478         l_ptr->first_out = NULL;
479         l_ptr->out_queue_size = 0;
480 }
481
482 /**
483  * tipc_link_reset_fragments - purge link's inbound message fragments queue
484  * @l_ptr: pointer to link
485  */
486 void tipc_link_reset_fragments(struct tipc_link *l_ptr)
487 {
488         struct sk_buff *buf = l_ptr->defragm_buf;
489         struct sk_buff *next;
490
491         while (buf) {
492                 next = buf->next;
493                 kfree_skb(buf);
494                 buf = next;
495         }
496         l_ptr->defragm_buf = NULL;
497 }
498
499 /**
500  * tipc_link_stop - purge all inbound and outbound messages associated with link
501  * @l_ptr: pointer to link
502  */
503 void tipc_link_stop(struct tipc_link *l_ptr)
504 {
505         struct sk_buff *buf;
506         struct sk_buff *next;
507
508         buf = l_ptr->oldest_deferred_in;
509         while (buf) {
510                 next = buf->next;
511                 kfree_skb(buf);
512                 buf = next;
513         }
514
515         buf = l_ptr->first_out;
516         while (buf) {
517                 next = buf->next;
518                 kfree_skb(buf);
519                 buf = next;
520         }
521
522         tipc_link_reset_fragments(l_ptr);
523
524         kfree_skb(l_ptr->proto_msg_queue);
525         l_ptr->proto_msg_queue = NULL;
526 }
527
528 void tipc_link_reset(struct tipc_link *l_ptr)
529 {
530         struct sk_buff *buf;
531         u32 prev_state = l_ptr->state;
532         u32 checkpoint = l_ptr->next_in_no;
533         int was_active_link = tipc_link_is_active(l_ptr);
534
535         msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
536
537         /* Link is down, accept any session */
538         l_ptr->peer_session = INVALID_SESSION;
539
540         /* Prepare for max packet size negotiation */
541         link_init_max_pkt(l_ptr);
542
543         l_ptr->state = RESET_UNKNOWN;
544
545         if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
546                 return;
547
548         tipc_node_link_down(l_ptr->owner, l_ptr);
549         tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
550
551         if (was_active_link && tipc_node_active_links(l_ptr->owner) &&
552             l_ptr->owner->permit_changeover) {
553                 l_ptr->reset_checkpoint = checkpoint;
554                 l_ptr->exp_msg_count = START_CHANGEOVER;
555         }
556
557         /* Clean up all queues: */
558         link_release_outqueue(l_ptr);
559         kfree_skb(l_ptr->proto_msg_queue);
560         l_ptr->proto_msg_queue = NULL;
561         buf = l_ptr->oldest_deferred_in;
562         while (buf) {
563                 struct sk_buff *next = buf->next;
564                 kfree_skb(buf);
565                 buf = next;
566         }
567         if (!list_empty(&l_ptr->waiting_ports))
568                 tipc_link_wakeup_ports(l_ptr, 1);
569
570         l_ptr->retransm_queue_head = 0;
571         l_ptr->retransm_queue_size = 0;
572         l_ptr->last_out = NULL;
573         l_ptr->first_out = NULL;
574         l_ptr->next_out = NULL;
575         l_ptr->unacked_window = 0;
576         l_ptr->checkpoint = 1;
577         l_ptr->next_out_no = 1;
578         l_ptr->deferred_inqueue_sz = 0;
579         l_ptr->oldest_deferred_in = NULL;
580         l_ptr->newest_deferred_in = NULL;
581         l_ptr->fsm_msg_cnt = 0;
582         l_ptr->stale_count = 0;
583         link_reset_statistics(l_ptr);
584 }
585
586
587 static void link_activate(struct tipc_link *l_ptr)
588 {
589         l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
590         tipc_node_link_up(l_ptr->owner, l_ptr);
591         tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
592 }
593
594 /**
595  * link_state_event - link finite state machine
596  * @l_ptr: pointer to link
597  * @event: state machine event to process
598  */
599 static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
600 {
601         struct tipc_link *other;
602         u32 cont_intv = l_ptr->continuity_interval;
603
604         if (!l_ptr->started && (event != STARTING_EVT))
605                 return;         /* Not yet. */
606
607         if (link_blocked(l_ptr)) {
608                 if (event == TIMEOUT_EVT)
609                         link_set_timer(l_ptr, cont_intv);
610                 return;   /* Changeover going on */
611         }
612
613         switch (l_ptr->state) {
614         case WORKING_WORKING:
615                 switch (event) {
616                 case TRAFFIC_MSG_EVT:
617                 case ACTIVATE_MSG:
618                         break;
619                 case TIMEOUT_EVT:
620                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
621                                 l_ptr->checkpoint = l_ptr->next_in_no;
622                                 if (tipc_bclink_acks_missing(l_ptr->owner)) {
623                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
624                                                                  0, 0, 0, 0, 0);
625                                         l_ptr->fsm_msg_cnt++;
626                                 } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
627                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
628                                                                  1, 0, 0, 0, 0);
629                                         l_ptr->fsm_msg_cnt++;
630                                 }
631                                 link_set_timer(l_ptr, cont_intv);
632                                 break;
633                         }
634                         l_ptr->state = WORKING_UNKNOWN;
635                         l_ptr->fsm_msg_cnt = 0;
636                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
637                         l_ptr->fsm_msg_cnt++;
638                         link_set_timer(l_ptr, cont_intv / 4);
639                         break;
640                 case RESET_MSG:
641                         pr_info("%s<%s>, requested by peer\n", link_rst_msg,
642                                 l_ptr->name);
643                         tipc_link_reset(l_ptr);
644                         l_ptr->state = RESET_RESET;
645                         l_ptr->fsm_msg_cnt = 0;
646                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
647                         l_ptr->fsm_msg_cnt++;
648                         link_set_timer(l_ptr, cont_intv);
649                         break;
650                 default:
651                         pr_err("%s%u in WW state\n", link_unk_evt, event);
652                 }
653                 break;
654         case WORKING_UNKNOWN:
655                 switch (event) {
656                 case TRAFFIC_MSG_EVT:
657                 case ACTIVATE_MSG:
658                         l_ptr->state = WORKING_WORKING;
659                         l_ptr->fsm_msg_cnt = 0;
660                         link_set_timer(l_ptr, cont_intv);
661                         break;
662                 case RESET_MSG:
663                         pr_info("%s<%s>, requested by peer while probing\n",
664                                 link_rst_msg, l_ptr->name);
665                         tipc_link_reset(l_ptr);
666                         l_ptr->state = RESET_RESET;
667                         l_ptr->fsm_msg_cnt = 0;
668                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
669                         l_ptr->fsm_msg_cnt++;
670                         link_set_timer(l_ptr, cont_intv);
671                         break;
672                 case TIMEOUT_EVT:
673                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
674                                 l_ptr->state = WORKING_WORKING;
675                                 l_ptr->fsm_msg_cnt = 0;
676                                 l_ptr->checkpoint = l_ptr->next_in_no;
677                                 if (tipc_bclink_acks_missing(l_ptr->owner)) {
678                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
679                                                                  0, 0, 0, 0, 0);
680                                         l_ptr->fsm_msg_cnt++;
681                                 }
682                                 link_set_timer(l_ptr, cont_intv);
683                         } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
684                                 tipc_link_send_proto_msg(l_ptr, STATE_MSG,
685                                                          1, 0, 0, 0, 0);
686                                 l_ptr->fsm_msg_cnt++;
687                                 link_set_timer(l_ptr, cont_intv / 4);
688                         } else {        /* Link has failed */
689                                 pr_warn("%s<%s>, peer not responding\n",
690                                         link_rst_msg, l_ptr->name);
691                                 tipc_link_reset(l_ptr);
692                                 l_ptr->state = RESET_UNKNOWN;
693                                 l_ptr->fsm_msg_cnt = 0;
694                                 tipc_link_send_proto_msg(l_ptr, RESET_MSG,
695                                                          0, 0, 0, 0, 0);
696                                 l_ptr->fsm_msg_cnt++;
697                                 link_set_timer(l_ptr, cont_intv);
698                         }
699                         break;
700                 default:
701                         pr_err("%s%u in WU state\n", link_unk_evt, event);
702                 }
703                 break;
704         case RESET_UNKNOWN:
705                 switch (event) {
706                 case TRAFFIC_MSG_EVT:
707                         break;
708                 case ACTIVATE_MSG:
709                         other = l_ptr->owner->active_links[0];
710                         if (other && link_working_unknown(other))
711                                 break;
712                         l_ptr->state = WORKING_WORKING;
713                         l_ptr->fsm_msg_cnt = 0;
714                         link_activate(l_ptr);
715                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
716                         l_ptr->fsm_msg_cnt++;
717                         if (l_ptr->owner->working_links == 1)
718                                 tipc_link_send_sync(l_ptr);
719                         link_set_timer(l_ptr, cont_intv);
720                         break;
721                 case RESET_MSG:
722                         l_ptr->state = RESET_RESET;
723                         l_ptr->fsm_msg_cnt = 0;
724                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0);
725                         l_ptr->fsm_msg_cnt++;
726                         link_set_timer(l_ptr, cont_intv);
727                         break;
728                 case STARTING_EVT:
729                         l_ptr->started = 1;
730                         /* fall through */
731                 case TIMEOUT_EVT:
732                         tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
733                         l_ptr->fsm_msg_cnt++;
734                         link_set_timer(l_ptr, cont_intv);
735                         break;
736                 default:
737                         pr_err("%s%u in RU state\n", link_unk_evt, event);
738                 }
739                 break;
740         case RESET_RESET:
741                 switch (event) {
742                 case TRAFFIC_MSG_EVT:
743                 case ACTIVATE_MSG:
744                         other = l_ptr->owner->active_links[0];
745                         if (other && link_working_unknown(other))
746                                 break;
747                         l_ptr->state = WORKING_WORKING;
748                         l_ptr->fsm_msg_cnt = 0;
749                         link_activate(l_ptr);
750                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
751                         l_ptr->fsm_msg_cnt++;
752                         if (l_ptr->owner->working_links == 1)
753                                 tipc_link_send_sync(l_ptr);
754                         link_set_timer(l_ptr, cont_intv);
755                         break;
756                 case RESET_MSG:
757                         break;
758                 case TIMEOUT_EVT:
759                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
760                         l_ptr->fsm_msg_cnt++;
761                         link_set_timer(l_ptr, cont_intv);
762                         break;
763                 default:
764                         pr_err("%s%u in RR state\n", link_unk_evt, event);
765                 }
766                 break;
767         default:
768                 pr_err("Unknown link state %u/%u\n", l_ptr->state, event);
769         }
770 }
771
772 /*
773  * link_bundle_buf(): Append contents of a buffer to
774  * the tail of an existing one.
775  */
776 static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler,
777                            struct sk_buff *buf)
778 {
779         struct tipc_msg *bundler_msg = buf_msg(bundler);
780         struct tipc_msg *msg = buf_msg(buf);
781         u32 size = msg_size(msg);
782         u32 bundle_size = msg_size(bundler_msg);
783         u32 to_pos = align(bundle_size);
784         u32 pad = to_pos - bundle_size;
785
786         if (msg_user(bundler_msg) != MSG_BUNDLER)
787                 return 0;
788         if (msg_type(bundler_msg) != OPEN_MSG)
789                 return 0;
790         if (skb_tailroom(bundler) < (pad + size))
791                 return 0;
792         if (l_ptr->max_pkt < (to_pos + size))
793                 return 0;
794
795         skb_put(bundler, pad + size);
796         skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
797         msg_set_size(bundler_msg, to_pos + size);
798         msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
799         kfree_skb(buf);
800         l_ptr->stats.sent_bundled++;
801         return 1;
802 }
803
804 static void link_add_to_outqueue(struct tipc_link *l_ptr,
805                                  struct sk_buff *buf,
806                                  struct tipc_msg *msg)
807 {
808         u32 ack = mod(l_ptr->next_in_no - 1);
809         u32 seqno = mod(l_ptr->next_out_no++);
810
811         msg_set_word(msg, 2, ((ack << 16) | seqno));
812         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
813         buf->next = NULL;
814         if (l_ptr->first_out) {
815                 l_ptr->last_out->next = buf;
816                 l_ptr->last_out = buf;
817         } else
818                 l_ptr->first_out = l_ptr->last_out = buf;
819
820         l_ptr->out_queue_size++;
821         if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
822                 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
823 }
824
825 static void link_add_chain_to_outqueue(struct tipc_link *l_ptr,
826                                        struct sk_buff *buf_chain,
827                                        u32 long_msgno)
828 {
829         struct sk_buff *buf;
830         struct tipc_msg *msg;
831
832         if (!l_ptr->next_out)
833                 l_ptr->next_out = buf_chain;
834         while (buf_chain) {
835                 buf = buf_chain;
836                 buf_chain = buf_chain->next;
837
838                 msg = buf_msg(buf);
839                 msg_set_long_msgno(msg, long_msgno);
840                 link_add_to_outqueue(l_ptr, buf, msg);
841         }
842 }
843
844 /*
845  * tipc_link_send_buf() is the 'full path' for messages, called from
846  * inside TIPC when the 'fast path' in tipc_send_buf
847  * has failed, and from link_send()
848  */
849 int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
850 {
851         struct tipc_msg *msg = buf_msg(buf);
852         u32 size = msg_size(msg);
853         u32 dsz = msg_data_sz(msg);
854         u32 queue_size = l_ptr->out_queue_size;
855         u32 imp = tipc_msg_tot_importance(msg);
856         u32 queue_limit = l_ptr->queue_limit[imp];
857         u32 max_packet = l_ptr->max_pkt;
858
859         /* Match msg importance against queue limits: */
860         if (unlikely(queue_size >= queue_limit)) {
861                 if (imp <= TIPC_CRITICAL_IMPORTANCE) {
862                         link_schedule_port(l_ptr, msg_origport(msg), size);
863                         kfree_skb(buf);
864                         return -ELINKCONG;
865                 }
866                 kfree_skb(buf);
867                 if (imp > CONN_MANAGER) {
868                         pr_warn("%s<%s>, send queue full", link_rst_msg,
869                                 l_ptr->name);
870                         tipc_link_reset(l_ptr);
871                 }
872                 return dsz;
873         }
874
875         /* Fragmentation needed ? */
876         if (size > max_packet)
877                 return link_send_long_buf(l_ptr, buf);
878
879         /* Packet can be queued or sent. */
880         if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) &&
881                    !link_congested(l_ptr))) {
882                 link_add_to_outqueue(l_ptr, buf, msg);
883
884                 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
885                 l_ptr->unacked_window = 0;
886                 return dsz;
887         }
888         /* Congestion: can message be bundled ? */
889         if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
890             (msg_user(msg) != MSG_FRAGMENTER)) {
891
892                 /* Try adding message to an existing bundle */
893                 if (l_ptr->next_out &&
894                     link_bundle_buf(l_ptr, l_ptr->last_out, buf))
895                         return dsz;
896
897                 /* Try creating a new bundle */
898                 if (size <= max_packet * 2 / 3) {
899                         struct sk_buff *bundler = tipc_buf_acquire(max_packet);
900                         struct tipc_msg bundler_hdr;
901
902                         if (bundler) {
903                                 tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
904                                          INT_H_SIZE, l_ptr->addr);
905                                 skb_copy_to_linear_data(bundler, &bundler_hdr,
906                                                         INT_H_SIZE);
907                                 skb_trim(bundler, INT_H_SIZE);
908                                 link_bundle_buf(l_ptr, bundler, buf);
909                                 buf = bundler;
910                                 msg = buf_msg(buf);
911                                 l_ptr->stats.sent_bundles++;
912                         }
913                 }
914         }
915         if (!l_ptr->next_out)
916                 l_ptr->next_out = buf;
917         link_add_to_outqueue(l_ptr, buf, msg);
918         return dsz;
919 }
920
921 /*
922  * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has
923  * not been selected yet, and the the owner node is not locked
924  * Called by TIPC internal users, e.g. the name distributor
925  */
926 int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
927 {
928         struct tipc_link *l_ptr;
929         struct tipc_node *n_ptr;
930         int res = -ELINKCONG;
931
932         read_lock_bh(&tipc_net_lock);
933         n_ptr = tipc_node_find(dest);
934         if (n_ptr) {
935                 tipc_node_lock(n_ptr);
936                 l_ptr = n_ptr->active_links[selector & 1];
937                 if (l_ptr)
938                         res = tipc_link_send_buf(l_ptr, buf);
939                 else
940                         kfree_skb(buf);
941                 tipc_node_unlock(n_ptr);
942         } else {
943                 kfree_skb(buf);
944         }
945         read_unlock_bh(&tipc_net_lock);
946         return res;
947 }
948
949 /*
950  * tipc_link_send_sync - synchronize broadcast link endpoints.
951  *
952  * Give a newly added peer node the sequence number where it should
953  * start receiving and acking broadcast packets.
954  *
955  * Called with node locked
956  */
957 static void tipc_link_send_sync(struct tipc_link *l)
958 {
959         struct sk_buff *buf;
960         struct tipc_msg *msg;
961
962         buf = tipc_buf_acquire(INT_H_SIZE);
963         if (!buf)
964                 return;
965
966         msg = buf_msg(buf);
967         tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr);
968         msg_set_last_bcast(msg, l->owner->bclink.acked);
969         link_add_chain_to_outqueue(l, buf, 0);
970         tipc_link_push_queue(l);
971 }
972
973 /*
974  * tipc_link_recv_sync - synchronize broadcast link endpoints.
975  * Receive the sequence number where we should start receiving and
976  * acking broadcast packets from a newly added peer node, and open
977  * up for reception of such packets.
978  *
979  * Called with node locked
980  */
981 static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf)
982 {
983         struct tipc_msg *msg = buf_msg(buf);
984
985         n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
986         n->bclink.recv_permitted = true;
987         kfree_skb(buf);
988 }
989
990 /*
991  * tipc_link_send_names - send name table entries to new neighbor
992  *
993  * Send routine for bulk delivery of name table messages when contact
994  * with a new neighbor occurs. No link congestion checking is performed
995  * because name table messages *must* be delivered. The messages must be
996  * small enough not to require fragmentation.
997  * Called without any locks held.
998  */
999 void tipc_link_send_names(struct list_head *message_list, u32 dest)
1000 {
1001         struct tipc_node *n_ptr;
1002         struct tipc_link *l_ptr;
1003         struct sk_buff *buf;
1004         struct sk_buff *temp_buf;
1005
1006         if (list_empty(message_list))
1007                 return;
1008
1009         read_lock_bh(&tipc_net_lock);
1010         n_ptr = tipc_node_find(dest);
1011         if (n_ptr) {
1012                 tipc_node_lock(n_ptr);
1013                 l_ptr = n_ptr->active_links[0];
1014                 if (l_ptr) {
1015                         /* convert circular list to linear list */
1016                         ((struct sk_buff *)message_list->prev)->next = NULL;
1017                         link_add_chain_to_outqueue(l_ptr,
1018                                 (struct sk_buff *)message_list->next, 0);
1019                         tipc_link_push_queue(l_ptr);
1020                         INIT_LIST_HEAD(message_list);
1021                 }
1022                 tipc_node_unlock(n_ptr);
1023         }
1024         read_unlock_bh(&tipc_net_lock);
1025
1026         /* discard the messages if they couldn't be sent */
1027         list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
1028                 list_del((struct list_head *)buf);
1029                 kfree_skb(buf);
1030         }
1031 }
1032
1033 /*
1034  * link_send_buf_fast: Entry for data messages where the
1035  * destination link is known and the header is complete,
1036  * inclusive total message length. Very time critical.
1037  * Link is locked. Returns user data length.
1038  */
1039 static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
1040                               u32 *used_max_pkt)
1041 {
1042         struct tipc_msg *msg = buf_msg(buf);
1043         int res = msg_data_sz(msg);
1044
1045         if (likely(!link_congested(l_ptr))) {
1046                 if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
1047                         if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) {
1048                                 link_add_to_outqueue(l_ptr, buf, msg);
1049                                 tipc_bearer_send(l_ptr->b_ptr, buf,
1050                                                  &l_ptr->media_addr);
1051                                 l_ptr->unacked_window = 0;
1052                                 return res;
1053                         }
1054                 } else
1055                         *used_max_pkt = l_ptr->max_pkt;
1056         }
1057         return tipc_link_send_buf(l_ptr, buf);  /* All other cases */
1058 }
1059
1060 /*
1061  * tipc_link_send_sections_fast: Entry for messages where the
1062  * destination processor is known and the header is complete,
1063  * except for total message length.
1064  * Returns user data length or errno.
1065  */
1066 int tipc_link_send_sections_fast(struct tipc_port *sender,
1067                                  struct iovec const *msg_sect,
1068                                  const u32 num_sect, unsigned int total_len,
1069                                  u32 destaddr)
1070 {
1071         struct tipc_msg *hdr = &sender->phdr;
1072         struct tipc_link *l_ptr;
1073         struct sk_buff *buf;
1074         struct tipc_node *node;
1075         int res;
1076         u32 selector = msg_origport(hdr) & 1;
1077
1078 again:
1079         /*
1080          * Try building message using port's max_pkt hint.
1081          * (Must not hold any locks while building message.)
1082          */
1083         res = tipc_msg_build(hdr, msg_sect, num_sect, total_len,
1084                              sender->max_pkt, &buf);
1085         /* Exit if build request was invalid */
1086         if (unlikely(res < 0))
1087                 return res;
1088
1089         read_lock_bh(&tipc_net_lock);
1090         node = tipc_node_find(destaddr);
1091         if (likely(node)) {
1092                 tipc_node_lock(node);
1093                 l_ptr = node->active_links[selector];
1094                 if (likely(l_ptr)) {
1095                         if (likely(buf)) {
1096                                 res = link_send_buf_fast(l_ptr, buf,
1097                                                          &sender->max_pkt);
1098 exit:
1099                                 tipc_node_unlock(node);
1100                                 read_unlock_bh(&tipc_net_lock);
1101                                 return res;
1102                         }
1103
1104                         /* Exit if link (or bearer) is congested */
1105                         if (link_congested(l_ptr) ||
1106                             tipc_bearer_blocked(l_ptr->b_ptr)) {
1107                                 res = link_schedule_port(l_ptr,
1108                                                          sender->ref, res);
1109                                 goto exit;
1110                         }
1111
1112                         /*
1113                          * Message size exceeds max_pkt hint; update hint,
1114                          * then re-try fast path or fragment the message
1115                          */
1116                         sender->max_pkt = l_ptr->max_pkt;
1117                         tipc_node_unlock(node);
1118                         read_unlock_bh(&tipc_net_lock);
1119
1120
1121                         if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
1122                                 goto again;
1123
1124                         return link_send_sections_long(sender, msg_sect,
1125                                                        num_sect, total_len,
1126                                                        destaddr);
1127                 }
1128                 tipc_node_unlock(node);
1129         }
1130         read_unlock_bh(&tipc_net_lock);
1131
1132         /* Couldn't find a link to the destination node */
1133         if (buf)
1134                 return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1135         if (res >= 0)
1136                 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1137                                                  total_len, TIPC_ERR_NO_NODE);
1138         return res;
1139 }
1140
1141 /*
1142  * link_send_sections_long(): Entry for long messages where the
1143  * destination node is known and the header is complete,
1144  * inclusive total message length.
1145  * Link and bearer congestion status have been checked to be ok,
1146  * and are ignored if they change.
1147  *
1148  * Note that fragments do not use the full link MTU so that they won't have
1149  * to undergo refragmentation if link changeover causes them to be sent
1150  * over another link with an additional tunnel header added as prefix.
1151  * (Refragmentation will still occur if the other link has a smaller MTU.)
1152  *
1153  * Returns user data length or errno.
1154  */
1155 static int link_send_sections_long(struct tipc_port *sender,
1156                                    struct iovec const *msg_sect,
1157                                    u32 num_sect, unsigned int total_len,
1158                                    u32 destaddr)
1159 {
1160         struct tipc_link *l_ptr;
1161         struct tipc_node *node;
1162         struct tipc_msg *hdr = &sender->phdr;
1163         u32 dsz = total_len;
1164         u32 max_pkt, fragm_sz, rest;
1165         struct tipc_msg fragm_hdr;
1166         struct sk_buff *buf, *buf_chain, *prev;
1167         u32 fragm_crs, fragm_rest, hsz, sect_rest;
1168         const unchar *sect_crs;
1169         int curr_sect;
1170         u32 fragm_no;
1171         int res = 0;
1172
1173 again:
1174         fragm_no = 1;
1175         max_pkt = sender->max_pkt - INT_H_SIZE;
1176                 /* leave room for tunnel header in case of link changeover */
1177         fragm_sz = max_pkt - INT_H_SIZE;
1178                 /* leave room for fragmentation header in each fragment */
1179         rest = dsz;
1180         fragm_crs = 0;
1181         fragm_rest = 0;
1182         sect_rest = 0;
1183         sect_crs = NULL;
1184         curr_sect = -1;
1185
1186         /* Prepare reusable fragment header */
1187         tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1188                  INT_H_SIZE, msg_destnode(hdr));
1189         msg_set_size(&fragm_hdr, max_pkt);
1190         msg_set_fragm_no(&fragm_hdr, 1);
1191
1192         /* Prepare header of first fragment */
1193         buf_chain = buf = tipc_buf_acquire(max_pkt);
1194         if (!buf)
1195                 return -ENOMEM;
1196         buf->next = NULL;
1197         skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1198         hsz = msg_hdr_sz(hdr);
1199         skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
1200
1201         /* Chop up message */
1202         fragm_crs = INT_H_SIZE + hsz;
1203         fragm_rest = fragm_sz - hsz;
1204
1205         do {            /* For all sections */
1206                 u32 sz;
1207
1208                 if (!sect_rest) {
1209                         sect_rest = msg_sect[++curr_sect].iov_len;
1210                         sect_crs = (const unchar *)msg_sect[curr_sect].iov_base;
1211                 }
1212
1213                 if (sect_rest < fragm_rest)
1214                         sz = sect_rest;
1215                 else
1216                         sz = fragm_rest;
1217
1218                 if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1219                         res = -EFAULT;
1220 error:
1221                         for (; buf_chain; buf_chain = buf) {
1222                                 buf = buf_chain->next;
1223                                 kfree_skb(buf_chain);
1224                         }
1225                         return res;
1226                 }
1227                 sect_crs += sz;
1228                 sect_rest -= sz;
1229                 fragm_crs += sz;
1230                 fragm_rest -= sz;
1231                 rest -= sz;
1232
1233                 if (!fragm_rest && rest) {
1234
1235                         /* Initiate new fragment: */
1236                         if (rest <= fragm_sz) {
1237                                 fragm_sz = rest;
1238                                 msg_set_type(&fragm_hdr, LAST_FRAGMENT);
1239                         } else {
1240                                 msg_set_type(&fragm_hdr, FRAGMENT);
1241                         }
1242                         msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1243                         msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1244                         prev = buf;
1245                         buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
1246                         if (!buf) {
1247                                 res = -ENOMEM;
1248                                 goto error;
1249                         }
1250
1251                         buf->next = NULL;
1252                         prev->next = buf;
1253                         skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1254                         fragm_crs = INT_H_SIZE;
1255                         fragm_rest = fragm_sz;
1256                 }
1257         } while (rest > 0);
1258
1259         /*
1260          * Now we have a buffer chain. Select a link and check
1261          * that packet size is still OK
1262          */
1263         node = tipc_node_find(destaddr);
1264         if (likely(node)) {
1265                 tipc_node_lock(node);
1266                 l_ptr = node->active_links[sender->ref & 1];
1267                 if (!l_ptr) {
1268                         tipc_node_unlock(node);
1269                         goto reject;
1270                 }
1271                 if (l_ptr->max_pkt < max_pkt) {
1272                         sender->max_pkt = l_ptr->max_pkt;
1273                         tipc_node_unlock(node);
1274                         for (; buf_chain; buf_chain = buf) {
1275                                 buf = buf_chain->next;
1276                                 kfree_skb(buf_chain);
1277                         }
1278                         goto again;
1279                 }
1280         } else {
1281 reject:
1282                 for (; buf_chain; buf_chain = buf) {
1283                         buf = buf_chain->next;
1284                         kfree_skb(buf_chain);
1285                 }
1286                 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1287                                                  total_len, TIPC_ERR_NO_NODE);
1288         }
1289
1290         /* Append chain of fragments to send queue & send them */
1291         l_ptr->long_msg_seq_no++;
1292         link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
1293         l_ptr->stats.sent_fragments += fragm_no;
1294         l_ptr->stats.sent_fragmented++;
1295         tipc_link_push_queue(l_ptr);
1296         tipc_node_unlock(node);
1297         return dsz;
1298 }
1299
1300 /*
1301  * tipc_link_push_packet: Push one unsent packet to the media
1302  */
1303 u32 tipc_link_push_packet(struct tipc_link *l_ptr)
1304 {
1305         struct sk_buff *buf = l_ptr->first_out;
1306         u32 r_q_size = l_ptr->retransm_queue_size;
1307         u32 r_q_head = l_ptr->retransm_queue_head;
1308
1309         /* Step to position where retransmission failed, if any,    */
1310         /* consider that buffers may have been released in meantime */
1311         if (r_q_size && buf) {
1312                 u32 last = lesser(mod(r_q_head + r_q_size),
1313                                   link_last_sent(l_ptr));
1314                 u32 first = buf_seqno(buf);
1315
1316                 while (buf && less(first, r_q_head)) {
1317                         first = mod(first + 1);
1318                         buf = buf->next;
1319                 }
1320                 l_ptr->retransm_queue_head = r_q_head = first;
1321                 l_ptr->retransm_queue_size = r_q_size = mod(last - first);
1322         }
1323
1324         /* Continue retransmission now, if there is anything: */
1325         if (r_q_size && buf) {
1326                 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1327                 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1328                 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1329                 l_ptr->retransm_queue_head = mod(++r_q_head);
1330                 l_ptr->retransm_queue_size = --r_q_size;
1331                 l_ptr->stats.retransmitted++;
1332                 return 0;
1333         }
1334
1335         /* Send deferred protocol message, if any: */
1336         buf = l_ptr->proto_msg_queue;
1337         if (buf) {
1338                 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1339                 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1340                 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1341                 l_ptr->unacked_window = 0;
1342                 kfree_skb(buf);
1343                 l_ptr->proto_msg_queue = NULL;
1344                 return 0;
1345         }
1346
1347         /* Send one deferred data message, if send window not full: */
1348         buf = l_ptr->next_out;
1349         if (buf) {
1350                 struct tipc_msg *msg = buf_msg(buf);
1351                 u32 next = msg_seqno(msg);
1352                 u32 first = buf_seqno(l_ptr->first_out);
1353
1354                 if (mod(next - first) < l_ptr->queue_limit[0]) {
1355                         msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1356                         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1357                         tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1358                         if (msg_user(msg) == MSG_BUNDLER)
1359                                 msg_set_type(msg, CLOSED_MSG);
1360                         l_ptr->next_out = buf->next;
1361                         return 0;
1362                 }
1363         }
1364         return 1;
1365 }
1366
1367 /*
1368  * push_queue(): push out the unsent messages of a link where
1369  *               congestion has abated. Node is locked
1370  */
1371 void tipc_link_push_queue(struct tipc_link *l_ptr)
1372 {
1373         u32 res;
1374
1375         if (tipc_bearer_blocked(l_ptr->b_ptr))
1376                 return;
1377
1378         do {
1379                 res = tipc_link_push_packet(l_ptr);
1380         } while (!res);
1381 }
1382
1383 static void link_reset_all(unsigned long addr)
1384 {
1385         struct tipc_node *n_ptr;
1386         char addr_string[16];
1387         u32 i;
1388
1389         read_lock_bh(&tipc_net_lock);
1390         n_ptr = tipc_node_find((u32)addr);
1391         if (!n_ptr) {
1392                 read_unlock_bh(&tipc_net_lock);
1393                 return; /* node no longer exists */
1394         }
1395
1396         tipc_node_lock(n_ptr);
1397
1398         pr_warn("Resetting all links to %s\n",
1399                 tipc_addr_string_fill(addr_string, n_ptr->addr));
1400
1401         for (i = 0; i < MAX_BEARERS; i++) {
1402                 if (n_ptr->links[i]) {
1403                         link_print(n_ptr->links[i], "Resetting link\n");
1404                         tipc_link_reset(n_ptr->links[i]);
1405                 }
1406         }
1407
1408         tipc_node_unlock(n_ptr);
1409         read_unlock_bh(&tipc_net_lock);
1410 }
1411
1412 static void link_retransmit_failure(struct tipc_link *l_ptr,
1413                                     struct sk_buff *buf)
1414 {
1415         struct tipc_msg *msg = buf_msg(buf);
1416
1417         pr_warn("Retransmission failure on link <%s>\n", l_ptr->name);
1418
1419         if (l_ptr->addr) {
1420                 /* Handle failure on standard link */
1421                 link_print(l_ptr, "Resetting link\n");
1422                 tipc_link_reset(l_ptr);
1423
1424         } else {
1425                 /* Handle failure on broadcast link */
1426                 struct tipc_node *n_ptr;
1427                 char addr_string[16];
1428
1429                 pr_info("Msg seq number: %u,  ", msg_seqno(msg));
1430                 pr_cont("Outstanding acks: %lu\n",
1431                         (unsigned long) TIPC_SKB_CB(buf)->handle);
1432
1433                 n_ptr = tipc_bclink_retransmit_to();
1434                 tipc_node_lock(n_ptr);
1435
1436                 tipc_addr_string_fill(addr_string, n_ptr->addr);
1437                 pr_info("Broadcast link info for %s\n", addr_string);
1438                 pr_info("Reception permitted: %d,  Acked: %u\n",
1439                         n_ptr->bclink.recv_permitted,
1440                         n_ptr->bclink.acked);
1441                 pr_info("Last in: %u,  Oos state: %u,  Last sent: %u\n",
1442                         n_ptr->bclink.last_in,
1443                         n_ptr->bclink.oos_state,
1444                         n_ptr->bclink.last_sent);
1445
1446                 tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
1447
1448                 tipc_node_unlock(n_ptr);
1449
1450                 l_ptr->stale_count = 0;
1451         }
1452 }
1453
1454 void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
1455                           u32 retransmits)
1456 {
1457         struct tipc_msg *msg;
1458
1459         if (!buf)
1460                 return;
1461
1462         msg = buf_msg(buf);
1463
1464         if (tipc_bearer_blocked(l_ptr->b_ptr)) {
1465                 if (l_ptr->retransm_queue_size == 0) {
1466                         l_ptr->retransm_queue_head = msg_seqno(msg);
1467                         l_ptr->retransm_queue_size = retransmits;
1468                 } else {
1469                         pr_err("Unexpected retransmit on link %s (qsize=%d)\n",
1470                                l_ptr->name, l_ptr->retransm_queue_size);
1471                 }
1472                 return;
1473         } else {
1474                 /* Detect repeated retransmit failures on unblocked bearer */
1475                 if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1476                         if (++l_ptr->stale_count > 100) {
1477                                 link_retransmit_failure(l_ptr, buf);
1478                                 return;
1479                         }
1480                 } else {
1481                         l_ptr->last_retransmitted = msg_seqno(msg);
1482                         l_ptr->stale_count = 1;
1483                 }
1484         }
1485
1486         while (retransmits && (buf != l_ptr->next_out) && buf) {
1487                 msg = buf_msg(buf);
1488                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1489                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1490                 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1491                 buf = buf->next;
1492                 retransmits--;
1493                 l_ptr->stats.retransmitted++;
1494         }
1495
1496         l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1497 }
1498
1499 /**
1500  * link_insert_deferred_queue - insert deferred messages back into receive chain
1501  */
1502 static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr,
1503                                                   struct sk_buff *buf)
1504 {
1505         u32 seq_no;
1506
1507         if (l_ptr->oldest_deferred_in == NULL)
1508                 return buf;
1509
1510         seq_no = buf_seqno(l_ptr->oldest_deferred_in);
1511         if (seq_no == mod(l_ptr->next_in_no)) {
1512                 l_ptr->newest_deferred_in->next = buf;
1513                 buf = l_ptr->oldest_deferred_in;
1514                 l_ptr->oldest_deferred_in = NULL;
1515                 l_ptr->deferred_inqueue_sz = 0;
1516         }
1517         return buf;
1518 }
1519
1520 /**
1521  * link_recv_buf_validate - validate basic format of received message
1522  *
1523  * This routine ensures a TIPC message has an acceptable header, and at least
1524  * as much data as the header indicates it should.  The routine also ensures
1525  * that the entire message header is stored in the main fragment of the message
1526  * buffer, to simplify future access to message header fields.
1527  *
1528  * Note: Having extra info present in the message header or data areas is OK.
1529  * TIPC will ignore the excess, under the assumption that it is optional info
1530  * introduced by a later release of the protocol.
1531  */
1532 static int link_recv_buf_validate(struct sk_buff *buf)
1533 {
1534         static u32 min_data_hdr_size[8] = {
1535                 SHORT_H_SIZE, MCAST_H_SIZE, NAMED_H_SIZE, BASIC_H_SIZE,
1536                 MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1537                 };
1538
1539         struct tipc_msg *msg;
1540         u32 tipc_hdr[2];
1541         u32 size;
1542         u32 hdr_size;
1543         u32 min_hdr_size;
1544
1545         if (unlikely(buf->len < MIN_H_SIZE))
1546                 return 0;
1547
1548         msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1549         if (msg == NULL)
1550                 return 0;
1551
1552         if (unlikely(msg_version(msg) != TIPC_VERSION))
1553                 return 0;
1554
1555         size = msg_size(msg);
1556         hdr_size = msg_hdr_sz(msg);
1557         min_hdr_size = msg_isdata(msg) ?
1558                 min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1559
1560         if (unlikely((hdr_size < min_hdr_size) ||
1561                      (size < hdr_size) ||
1562                      (buf->len < size) ||
1563                      (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1564                 return 0;
1565
1566         return pskb_may_pull(buf, hdr_size);
1567 }
1568
1569 /**
1570  * tipc_recv_msg - process TIPC messages arriving from off-node
1571  * @head: pointer to message buffer chain
1572  * @tb_ptr: pointer to bearer message arrived on
1573  *
1574  * Invoked with no locks held.  Bearer pointer must point to a valid bearer
1575  * structure (i.e. cannot be NULL), but bearer can be inactive.
1576  */
1577 void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
1578 {
1579         read_lock_bh(&tipc_net_lock);
1580         while (head) {
1581                 struct tipc_node *n_ptr;
1582                 struct tipc_link *l_ptr;
1583                 struct sk_buff *crs;
1584                 struct sk_buff *buf = head;
1585                 struct tipc_msg *msg;
1586                 u32 seq_no;
1587                 u32 ackd;
1588                 u32 released = 0;
1589                 int type;
1590
1591                 head = head->next;
1592
1593                 /* Ensure bearer is still enabled */
1594                 if (unlikely(!b_ptr->active))
1595                         goto cont;
1596
1597                 /* Ensure message is well-formed */
1598                 if (unlikely(!link_recv_buf_validate(buf)))
1599                         goto cont;
1600
1601                 /* Ensure message data is a single contiguous unit */
1602                 if (unlikely(skb_linearize(buf)))
1603                         goto cont;
1604
1605                 /* Handle arrival of a non-unicast link message */
1606                 msg = buf_msg(buf);
1607
1608                 if (unlikely(msg_non_seq(msg))) {
1609                         if (msg_user(msg) ==  LINK_CONFIG)
1610                                 tipc_disc_recv_msg(buf, b_ptr);
1611                         else
1612                                 tipc_bclink_recv_pkt(buf);
1613                         continue;
1614                 }
1615
1616                 /* Discard unicast link messages destined for another node */
1617                 if (unlikely(!msg_short(msg) &&
1618                              (msg_destnode(msg) != tipc_own_addr)))
1619                         goto cont;
1620
1621                 /* Locate neighboring node that sent message */
1622                 n_ptr = tipc_node_find(msg_prevnode(msg));
1623                 if (unlikely(!n_ptr))
1624                         goto cont;
1625                 tipc_node_lock(n_ptr);
1626
1627                 /* Locate unicast link endpoint that should handle message */
1628                 l_ptr = n_ptr->links[b_ptr->identity];
1629                 if (unlikely(!l_ptr)) {
1630                         tipc_node_unlock(n_ptr);
1631                         goto cont;
1632                 }
1633
1634                 /* Verify that communication with node is currently allowed */
1635                 if ((n_ptr->block_setup & WAIT_PEER_DOWN) &&
1636                         msg_user(msg) == LINK_PROTOCOL &&
1637                         (msg_type(msg) == RESET_MSG ||
1638                                         msg_type(msg) == ACTIVATE_MSG) &&
1639                         !msg_redundant_link(msg))
1640                         n_ptr->block_setup &= ~WAIT_PEER_DOWN;
1641
1642                 if (n_ptr->block_setup) {
1643                         tipc_node_unlock(n_ptr);
1644                         goto cont;
1645                 }
1646
1647                 /* Validate message sequence number info */
1648                 seq_no = msg_seqno(msg);
1649                 ackd = msg_ack(msg);
1650
1651                 /* Release acked messages */
1652                 if (n_ptr->bclink.recv_permitted)
1653                         tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1654
1655                 crs = l_ptr->first_out;
1656                 while ((crs != l_ptr->next_out) &&
1657                        less_eq(buf_seqno(crs), ackd)) {
1658                         struct sk_buff *next = crs->next;
1659
1660                         kfree_skb(crs);
1661                         crs = next;
1662                         released++;
1663                 }
1664                 if (released) {
1665                         l_ptr->first_out = crs;
1666                         l_ptr->out_queue_size -= released;
1667                 }
1668
1669                 /* Try sending any messages link endpoint has pending */
1670                 if (unlikely(l_ptr->next_out))
1671                         tipc_link_push_queue(l_ptr);
1672                 if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1673                         tipc_link_wakeup_ports(l_ptr, 0);
1674                 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1675                         l_ptr->stats.sent_acks++;
1676                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1677                 }
1678
1679                 /* Now (finally!) process the incoming message */
1680 protocol_check:
1681                 if (likely(link_working_working(l_ptr))) {
1682                         if (likely(seq_no == mod(l_ptr->next_in_no))) {
1683                                 l_ptr->next_in_no++;
1684                                 if (unlikely(l_ptr->oldest_deferred_in))
1685                                         head = link_insert_deferred_queue(l_ptr,
1686                                                                           head);
1687 deliver:
1688                                 if (likely(msg_isdata(msg))) {
1689                                         tipc_node_unlock(n_ptr);
1690                                         tipc_port_recv_msg(buf);
1691                                         continue;
1692                                 }
1693                                 switch (msg_user(msg)) {
1694                                         int ret;
1695                                 case MSG_BUNDLER:
1696                                         l_ptr->stats.recv_bundles++;
1697                                         l_ptr->stats.recv_bundled +=
1698                                                 msg_msgcnt(msg);
1699                                         tipc_node_unlock(n_ptr);
1700                                         tipc_link_recv_bundle(buf);
1701                                         continue;
1702                                 case NAME_DISTRIBUTOR:
1703                                         n_ptr->bclink.recv_permitted = true;
1704                                         tipc_node_unlock(n_ptr);
1705                                         tipc_named_recv(buf);
1706                                         continue;
1707                                 case BCAST_PROTOCOL:
1708                                         tipc_link_recv_sync(n_ptr, buf);
1709                                         tipc_node_unlock(n_ptr);
1710                                         continue;
1711                                 case CONN_MANAGER:
1712                                         tipc_node_unlock(n_ptr);
1713                                         tipc_port_recv_proto_msg(buf);
1714                                         continue;
1715                                 case MSG_FRAGMENTER:
1716                                         l_ptr->stats.recv_fragments++;
1717                                         ret = tipc_link_recv_fragment(
1718                                                 &l_ptr->defragm_buf,
1719                                                 &buf, &msg);
1720                                         if (ret == 1) {
1721                                                 l_ptr->stats.recv_fragmented++;
1722                                                 goto deliver;
1723                                         }
1724                                         if (ret == -1)
1725                                                 l_ptr->next_in_no--;
1726                                         break;
1727                                 case CHANGEOVER_PROTOCOL:
1728                                         type = msg_type(msg);
1729                                         if (link_recv_changeover_msg(&l_ptr,
1730                                                                      &buf)) {
1731                                                 msg = buf_msg(buf);
1732                                                 seq_no = msg_seqno(msg);
1733                                                 if (type == ORIGINAL_MSG)
1734                                                         goto deliver;
1735                                                 goto protocol_check;
1736                                         }
1737                                         break;
1738                                 default:
1739                                         kfree_skb(buf);
1740                                         buf = NULL;
1741                                         break;
1742                                 }
1743                                 tipc_node_unlock(n_ptr);
1744                                 tipc_net_route_msg(buf);
1745                                 continue;
1746                         }
1747                         link_handle_out_of_seq_msg(l_ptr, buf);
1748                         head = link_insert_deferred_queue(l_ptr, head);
1749                         tipc_node_unlock(n_ptr);
1750                         continue;
1751                 }
1752
1753                 /* Link is not in state WORKING_WORKING */
1754                 if (msg_user(msg) == LINK_PROTOCOL) {
1755                         link_recv_proto_msg(l_ptr, buf);
1756                         head = link_insert_deferred_queue(l_ptr, head);
1757                         tipc_node_unlock(n_ptr);
1758                         continue;
1759                 }
1760
1761                 /* Traffic message. Conditionally activate link */
1762                 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1763
1764                 if (link_working_working(l_ptr)) {
1765                         /* Re-insert buffer in front of queue */
1766                         buf->next = head;
1767                         head = buf;
1768                         tipc_node_unlock(n_ptr);
1769                         continue;
1770                 }
1771                 tipc_node_unlock(n_ptr);
1772 cont:
1773                 kfree_skb(buf);
1774         }
1775         read_unlock_bh(&tipc_net_lock);
1776 }
1777
1778 /**
1779  * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1780  *
1781  * Returns increase in queue length (i.e. 0 or 1)
1782  */
1783 u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail,
1784                         struct sk_buff *buf)
1785 {
1786         struct sk_buff *queue_buf;
1787         struct sk_buff **prev;
1788         u32 seq_no = buf_seqno(buf);
1789
1790         buf->next = NULL;
1791
1792         /* Empty queue ? */
1793         if (*head == NULL) {
1794                 *head = *tail = buf;
1795                 return 1;
1796         }
1797
1798         /* Last ? */
1799         if (less(buf_seqno(*tail), seq_no)) {
1800                 (*tail)->next = buf;
1801                 *tail = buf;
1802                 return 1;
1803         }
1804
1805         /* Locate insertion point in queue, then insert; discard if duplicate */
1806         prev = head;
1807         queue_buf = *head;
1808         for (;;) {
1809                 u32 curr_seqno = buf_seqno(queue_buf);
1810
1811                 if (seq_no == curr_seqno) {
1812                         kfree_skb(buf);
1813                         return 0;
1814                 }
1815
1816                 if (less(seq_no, curr_seqno))
1817                         break;
1818
1819                 prev = &queue_buf->next;
1820                 queue_buf = queue_buf->next;
1821         }
1822
1823         buf->next = queue_buf;
1824         *prev = buf;
1825         return 1;
1826 }
1827
1828 /*
1829  * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1830  */
1831 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1832                                        struct sk_buff *buf)
1833 {
1834         u32 seq_no = buf_seqno(buf);
1835
1836         if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1837                 link_recv_proto_msg(l_ptr, buf);
1838                 return;
1839         }
1840
1841         /* Record OOS packet arrival (force mismatch on next timeout) */
1842         l_ptr->checkpoint--;
1843
1844         /*
1845          * Discard packet if a duplicate; otherwise add it to deferred queue
1846          * and notify peer of gap as per protocol specification
1847          */
1848         if (less(seq_no, mod(l_ptr->next_in_no))) {
1849                 l_ptr->stats.duplicates++;
1850                 kfree_skb(buf);
1851                 return;
1852         }
1853
1854         if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in,
1855                                 &l_ptr->newest_deferred_in, buf)) {
1856                 l_ptr->deferred_inqueue_sz++;
1857                 l_ptr->stats.deferred_recv++;
1858                 if ((l_ptr->deferred_inqueue_sz % 16) == 1)
1859                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1860         } else
1861                 l_ptr->stats.duplicates++;
1862 }
1863
1864 /*
1865  * Send protocol message to the other endpoint.
1866  */
1867 void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
1868                               int probe_msg, u32 gap, u32 tolerance,
1869                               u32 priority, u32 ack_mtu)
1870 {
1871         struct sk_buff *buf = NULL;
1872         struct tipc_msg *msg = l_ptr->pmsg;
1873         u32 msg_size = sizeof(l_ptr->proto_msg);
1874         int r_flag;
1875
1876         /* Discard any previous message that was deferred due to congestion */
1877         if (l_ptr->proto_msg_queue) {
1878                 kfree_skb(l_ptr->proto_msg_queue);
1879                 l_ptr->proto_msg_queue = NULL;
1880         }
1881
1882         if (link_blocked(l_ptr))
1883                 return;
1884
1885         /* Abort non-RESET send if communication with node is prohibited */
1886         if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG))
1887                 return;
1888
1889         /* Create protocol message with "out-of-sequence" sequence number */
1890         msg_set_type(msg, msg_typ);
1891         msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
1892         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1893         msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
1894
1895         if (msg_typ == STATE_MSG) {
1896                 u32 next_sent = mod(l_ptr->next_out_no);
1897
1898                 if (!tipc_link_is_up(l_ptr))
1899                         return;
1900                 if (l_ptr->next_out)
1901                         next_sent = buf_seqno(l_ptr->next_out);
1902                 msg_set_next_sent(msg, next_sent);
1903                 if (l_ptr->oldest_deferred_in) {
1904                         u32 rec = buf_seqno(l_ptr->oldest_deferred_in);
1905                         gap = mod(rec - mod(l_ptr->next_in_no));
1906                 }
1907                 msg_set_seq_gap(msg, gap);
1908                 if (gap)
1909                         l_ptr->stats.sent_nacks++;
1910                 msg_set_link_tolerance(msg, tolerance);
1911                 msg_set_linkprio(msg, priority);
1912                 msg_set_max_pkt(msg, ack_mtu);
1913                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1914                 msg_set_probe(msg, probe_msg != 0);
1915                 if (probe_msg) {
1916                         u32 mtu = l_ptr->max_pkt;
1917
1918                         if ((mtu < l_ptr->max_pkt_target) &&
1919                             link_working_working(l_ptr) &&
1920                             l_ptr->fsm_msg_cnt) {
1921                                 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1922                                 if (l_ptr->max_pkt_probes == 10) {
1923                                         l_ptr->max_pkt_target = (msg_size - 4);
1924                                         l_ptr->max_pkt_probes = 0;
1925                                         msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1926                                 }
1927                                 l_ptr->max_pkt_probes++;
1928                         }
1929
1930                         l_ptr->stats.sent_probes++;
1931                 }
1932                 l_ptr->stats.sent_states++;
1933         } else {                /* RESET_MSG or ACTIVATE_MSG */
1934                 msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
1935                 msg_set_seq_gap(msg, 0);
1936                 msg_set_next_sent(msg, 1);
1937                 msg_set_probe(msg, 0);
1938                 msg_set_link_tolerance(msg, l_ptr->tolerance);
1939                 msg_set_linkprio(msg, l_ptr->priority);
1940                 msg_set_max_pkt(msg, l_ptr->max_pkt_target);
1941         }
1942
1943         r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
1944         msg_set_redundant_link(msg, r_flag);
1945         msg_set_linkprio(msg, l_ptr->priority);
1946         msg_set_size(msg, msg_size);
1947
1948         msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
1949
1950         buf = tipc_buf_acquire(msg_size);
1951         if (!buf)
1952                 return;
1953
1954         skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1955         buf->priority = TC_PRIO_CONTROL;
1956
1957         /* Defer message if bearer is already blocked */
1958         if (tipc_bearer_blocked(l_ptr->b_ptr)) {
1959                 l_ptr->proto_msg_queue = buf;
1960                 return;
1961         }
1962
1963         tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1964         l_ptr->unacked_window = 0;
1965         kfree_skb(buf);
1966 }
1967
1968 /*
1969  * Receive protocol message :
1970  * Note that network plane id propagates through the network, and may
1971  * change at any time. The node with lowest address rules
1972  */
1973 static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
1974 {
1975         u32 rec_gap = 0;
1976         u32 max_pkt_info;
1977         u32 max_pkt_ack;
1978         u32 msg_tol;
1979         struct tipc_msg *msg = buf_msg(buf);
1980
1981         if (link_blocked(l_ptr))
1982                 goto exit;
1983
1984         /* record unnumbered packet arrival (force mismatch on next timeout) */
1985         l_ptr->checkpoint--;
1986
1987         if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
1988                 if (tipc_own_addr > msg_prevnode(msg))
1989                         l_ptr->b_ptr->net_plane = msg_net_plane(msg);
1990
1991         l_ptr->owner->permit_changeover = msg_redundant_link(msg);
1992
1993         switch (msg_type(msg)) {
1994
1995         case RESET_MSG:
1996                 if (!link_working_unknown(l_ptr) &&
1997                     (l_ptr->peer_session != INVALID_SESSION)) {
1998                         if (less_eq(msg_session(msg), l_ptr->peer_session))
1999                                 break; /* duplicate or old reset: ignore */
2000                 }
2001
2002                 if (!msg_redundant_link(msg) && (link_working_working(l_ptr) ||
2003                                 link_working_unknown(l_ptr))) {
2004                         /*
2005                          * peer has lost contact -- don't allow peer's links
2006                          * to reactivate before we recognize loss & clean up
2007                          */
2008                         l_ptr->owner->block_setup = WAIT_NODE_DOWN;
2009                 }
2010
2011                 link_state_event(l_ptr, RESET_MSG);
2012
2013                 /* fall thru' */
2014         case ACTIVATE_MSG:
2015                 /* Update link settings according other endpoint's values */
2016                 strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
2017
2018                 msg_tol = msg_link_tolerance(msg);
2019                 if (msg_tol > l_ptr->tolerance)
2020                         link_set_supervision_props(l_ptr, msg_tol);
2021
2022                 if (msg_linkprio(msg) > l_ptr->priority)
2023                         l_ptr->priority = msg_linkprio(msg);
2024
2025                 max_pkt_info = msg_max_pkt(msg);
2026                 if (max_pkt_info) {
2027                         if (max_pkt_info < l_ptr->max_pkt_target)
2028                                 l_ptr->max_pkt_target = max_pkt_info;
2029                         if (l_ptr->max_pkt > l_ptr->max_pkt_target)
2030                                 l_ptr->max_pkt = l_ptr->max_pkt_target;
2031                 } else {
2032                         l_ptr->max_pkt = l_ptr->max_pkt_target;
2033                 }
2034
2035                 /* Synchronize broadcast link info, if not done previously */
2036                 if (!tipc_node_is_up(l_ptr->owner)) {
2037                         l_ptr->owner->bclink.last_sent =
2038                                 l_ptr->owner->bclink.last_in =
2039                                 msg_last_bcast(msg);
2040                         l_ptr->owner->bclink.oos_state = 0;
2041                 }
2042
2043                 l_ptr->peer_session = msg_session(msg);
2044                 l_ptr->peer_bearer_id = msg_bearer_id(msg);
2045
2046                 if (msg_type(msg) == ACTIVATE_MSG)
2047                         link_state_event(l_ptr, ACTIVATE_MSG);
2048                 break;
2049         case STATE_MSG:
2050
2051                 msg_tol = msg_link_tolerance(msg);
2052                 if (msg_tol)
2053                         link_set_supervision_props(l_ptr, msg_tol);
2054
2055                 if (msg_linkprio(msg) &&
2056                     (msg_linkprio(msg) != l_ptr->priority)) {
2057                         pr_warn("%s<%s>, priority change %u->%u\n",
2058                                 link_rst_msg, l_ptr->name, l_ptr->priority,
2059                                 msg_linkprio(msg));
2060                         l_ptr->priority = msg_linkprio(msg);
2061                         tipc_link_reset(l_ptr); /* Enforce change to take effect */
2062                         break;
2063                 }
2064                 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
2065                 l_ptr->stats.recv_states++;
2066                 if (link_reset_unknown(l_ptr))
2067                         break;
2068
2069                 if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
2070                         rec_gap = mod(msg_next_sent(msg) -
2071                                       mod(l_ptr->next_in_no));
2072                 }
2073
2074                 max_pkt_ack = msg_max_pkt(msg);
2075                 if (max_pkt_ack > l_ptr->max_pkt) {
2076                         l_ptr->max_pkt = max_pkt_ack;
2077                         l_ptr->max_pkt_probes = 0;
2078                 }
2079
2080                 max_pkt_ack = 0;
2081                 if (msg_probe(msg)) {
2082                         l_ptr->stats.recv_probes++;
2083                         if (msg_size(msg) > sizeof(l_ptr->proto_msg))
2084                                 max_pkt_ack = msg_size(msg);
2085                 }
2086
2087                 /* Protocol message before retransmits, reduce loss risk */
2088                 if (l_ptr->owner->bclink.recv_permitted)
2089                         tipc_bclink_update_link_state(l_ptr->owner,
2090                                                       msg_last_bcast(msg));
2091
2092                 if (rec_gap || (msg_probe(msg))) {
2093                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2094                                                  0, rec_gap, 0, 0, max_pkt_ack);
2095                 }
2096                 if (msg_seq_gap(msg)) {
2097                         l_ptr->stats.recv_nacks++;
2098                         tipc_link_retransmit(l_ptr, l_ptr->first_out,
2099                                              msg_seq_gap(msg));
2100                 }
2101                 break;
2102         }
2103 exit:
2104         kfree_skb(buf);
2105 }
2106
2107
2108 /*
2109  * tipc_link_tunnel(): Send one message via a link belonging to
2110  * another bearer. Owner node is locked.
2111  */
2112 static void tipc_link_tunnel(struct tipc_link *l_ptr,
2113                              struct tipc_msg *tunnel_hdr, struct tipc_msg *msg,
2114                              u32 selector)
2115 {
2116         struct tipc_link *tunnel;
2117         struct sk_buff *buf;
2118         u32 length = msg_size(msg);
2119
2120         tunnel = l_ptr->owner->active_links[selector & 1];
2121         if (!tipc_link_is_up(tunnel)) {
2122                 pr_warn("%stunnel link no longer available\n", link_co_err);
2123                 return;
2124         }
2125         msg_set_size(tunnel_hdr, length + INT_H_SIZE);
2126         buf = tipc_buf_acquire(length + INT_H_SIZE);
2127         if (!buf) {
2128                 pr_warn("%sunable to send tunnel msg\n", link_co_err);
2129                 return;
2130         }
2131         skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
2132         skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
2133         tipc_link_send_buf(tunnel, buf);
2134 }
2135
2136
2137
2138 /*
2139  * changeover(): Send whole message queue via the remaining link
2140  *               Owner node is locked.
2141  */
2142 void tipc_link_changeover(struct tipc_link *l_ptr)
2143 {
2144         u32 msgcount = l_ptr->out_queue_size;
2145         struct sk_buff *crs = l_ptr->first_out;
2146         struct tipc_link *tunnel = l_ptr->owner->active_links[0];
2147         struct tipc_msg tunnel_hdr;
2148         int split_bundles;
2149
2150         if (!tunnel)
2151                 return;
2152
2153         if (!l_ptr->owner->permit_changeover) {
2154                 pr_warn("%speer did not permit changeover\n", link_co_err);
2155                 return;
2156         }
2157
2158         tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2159                  ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
2160         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2161         msg_set_msgcnt(&tunnel_hdr, msgcount);
2162
2163         if (!l_ptr->first_out) {
2164                 struct sk_buff *buf;
2165
2166                 buf = tipc_buf_acquire(INT_H_SIZE);
2167                 if (buf) {
2168                         skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
2169                         msg_set_size(&tunnel_hdr, INT_H_SIZE);
2170                         tipc_link_send_buf(tunnel, buf);
2171                 } else {
2172                         pr_warn("%sunable to send changeover msg\n",
2173                                 link_co_err);
2174                 }
2175                 return;
2176         }
2177
2178         split_bundles = (l_ptr->owner->active_links[0] !=
2179                          l_ptr->owner->active_links[1]);
2180
2181         while (crs) {
2182                 struct tipc_msg *msg = buf_msg(crs);
2183
2184                 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
2185                         struct tipc_msg *m = msg_get_wrapped(msg);
2186                         unchar *pos = (unchar *)m;
2187
2188                         msgcount = msg_msgcnt(msg);
2189                         while (msgcount--) {
2190                                 msg_set_seqno(m, msg_seqno(msg));
2191                                 tipc_link_tunnel(l_ptr, &tunnel_hdr, m,
2192                                                  msg_link_selector(m));
2193                                 pos += align(msg_size(m));
2194                                 m = (struct tipc_msg *)pos;
2195                         }
2196                 } else {
2197                         tipc_link_tunnel(l_ptr, &tunnel_hdr, msg,
2198                                          msg_link_selector(msg));
2199                 }
2200                 crs = crs->next;
2201         }
2202 }
2203
2204 void tipc_link_send_duplicate(struct tipc_link *l_ptr, struct tipc_link *tunnel)
2205 {
2206         struct sk_buff *iter;
2207         struct tipc_msg tunnel_hdr;
2208
2209         tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2210                  DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
2211         msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
2212         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2213         iter = l_ptr->first_out;
2214         while (iter) {
2215                 struct sk_buff *outbuf;
2216                 struct tipc_msg *msg = buf_msg(iter);
2217                 u32 length = msg_size(msg);
2218
2219                 if (msg_user(msg) == MSG_BUNDLER)
2220                         msg_set_type(msg, CLOSED_MSG);
2221                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));   /* Update */
2222                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
2223                 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
2224                 outbuf = tipc_buf_acquire(length + INT_H_SIZE);
2225                 if (outbuf == NULL) {
2226                         pr_warn("%sunable to send duplicate msg\n",
2227                                 link_co_err);
2228                         return;
2229                 }
2230                 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
2231                 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
2232                                                length);
2233                 tipc_link_send_buf(tunnel, outbuf);
2234                 if (!tipc_link_is_up(l_ptr))
2235                         return;
2236                 iter = iter->next;
2237         }
2238 }
2239
2240 /**
2241  * buf_extract - extracts embedded TIPC message from another message
2242  * @skb: encapsulating message buffer
2243  * @from_pos: offset to extract from
2244  *
2245  * Returns a new message buffer containing an embedded message.  The
2246  * encapsulating message itself is left unchanged.
2247  */
2248 static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
2249 {
2250         struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
2251         u32 size = msg_size(msg);
2252         struct sk_buff *eb;
2253
2254         eb = tipc_buf_acquire(size);
2255         if (eb)
2256                 skb_copy_to_linear_data(eb, msg, size);
2257         return eb;
2258 }
2259
2260 /*
2261  *  link_recv_changeover_msg(): Receive tunneled packet sent
2262  *  via other link. Node is locked. Return extracted buffer.
2263  */
2264 static int link_recv_changeover_msg(struct tipc_link **l_ptr,
2265                                     struct sk_buff **buf)
2266 {
2267         struct sk_buff *tunnel_buf = *buf;
2268         struct tipc_link *dest_link;
2269         struct tipc_msg *msg;
2270         struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf);
2271         u32 msg_typ = msg_type(tunnel_msg);
2272         u32 msg_count = msg_msgcnt(tunnel_msg);
2273         u32 bearer_id = msg_bearer_id(tunnel_msg);
2274
2275         if (bearer_id >= MAX_BEARERS)
2276                 goto exit;
2277         dest_link = (*l_ptr)->owner->links[bearer_id];
2278         if (!dest_link)
2279                 goto exit;
2280         if (dest_link == *l_ptr) {
2281                 pr_err("Unexpected changeover message on link <%s>\n",
2282                        (*l_ptr)->name);
2283                 goto exit;
2284         }
2285         *l_ptr = dest_link;
2286         msg = msg_get_wrapped(tunnel_msg);
2287
2288         if (msg_typ == DUPLICATE_MSG) {
2289                 if (less(msg_seqno(msg), mod(dest_link->next_in_no)))
2290                         goto exit;
2291                 *buf = buf_extract(tunnel_buf, INT_H_SIZE);
2292                 if (*buf == NULL) {
2293                         pr_warn("%sduplicate msg dropped\n", link_co_err);
2294                         goto exit;
2295                 }
2296                 kfree_skb(tunnel_buf);
2297                 return 1;
2298         }
2299
2300         /* First original message ?: */
2301         if (tipc_link_is_up(dest_link)) {
2302                 pr_info("%s<%s>, changeover initiated by peer\n", link_rst_msg,
2303                         dest_link->name);
2304                 tipc_link_reset(dest_link);
2305                 dest_link->exp_msg_count = msg_count;
2306                 if (!msg_count)
2307                         goto exit;
2308         } else if (dest_link->exp_msg_count == START_CHANGEOVER) {
2309                 dest_link->exp_msg_count = msg_count;
2310                 if (!msg_count)
2311                         goto exit;
2312         }
2313
2314         /* Receive original message */
2315         if (dest_link->exp_msg_count == 0) {
2316                 pr_warn("%sgot too many tunnelled messages\n", link_co_err);
2317                 goto exit;
2318         }
2319         dest_link->exp_msg_count--;
2320         if (less(msg_seqno(msg), dest_link->reset_checkpoint)) {
2321                 goto exit;
2322         } else {
2323                 *buf = buf_extract(tunnel_buf, INT_H_SIZE);
2324                 if (*buf != NULL) {
2325                         kfree_skb(tunnel_buf);
2326                         return 1;
2327                 } else {
2328                         pr_warn("%soriginal msg dropped\n", link_co_err);
2329                 }
2330         }
2331 exit:
2332         *buf = NULL;
2333         kfree_skb(tunnel_buf);
2334         return 0;
2335 }
2336
2337 /*
2338  *  Bundler functionality:
2339  */
2340 void tipc_link_recv_bundle(struct sk_buff *buf)
2341 {
2342         u32 msgcount = msg_msgcnt(buf_msg(buf));
2343         u32 pos = INT_H_SIZE;
2344         struct sk_buff *obuf;
2345
2346         while (msgcount--) {
2347                 obuf = buf_extract(buf, pos);
2348                 if (obuf == NULL) {
2349                         pr_warn("Link unable to unbundle message(s)\n");
2350                         break;
2351                 }
2352                 pos += align(msg_size(buf_msg(obuf)));
2353                 tipc_net_route_msg(obuf);
2354         }
2355         kfree_skb(buf);
2356 }
2357
2358 /*
2359  *  Fragmentation/defragmentation:
2360  */
2361
2362 /*
2363  * link_send_long_buf: Entry for buffers needing fragmentation.
2364  * The buffer is complete, inclusive total message length.
2365  * Returns user data length.
2366  */
2367 static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
2368 {
2369         struct sk_buff *buf_chain = NULL;
2370         struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;
2371         struct tipc_msg *inmsg = buf_msg(buf);
2372         struct tipc_msg fragm_hdr;
2373         u32 insize = msg_size(inmsg);
2374         u32 dsz = msg_data_sz(inmsg);
2375         unchar *crs = buf->data;
2376         u32 rest = insize;
2377         u32 pack_sz = l_ptr->max_pkt;
2378         u32 fragm_sz = pack_sz - INT_H_SIZE;
2379         u32 fragm_no = 0;
2380         u32 destaddr;
2381
2382         if (msg_short(inmsg))
2383                 destaddr = l_ptr->addr;
2384         else
2385                 destaddr = msg_destnode(inmsg);
2386
2387         /* Prepare reusable fragment header: */
2388         tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2389                  INT_H_SIZE, destaddr);
2390
2391         /* Chop up message: */
2392         while (rest > 0) {
2393                 struct sk_buff *fragm;
2394
2395                 if (rest <= fragm_sz) {
2396                         fragm_sz = rest;
2397                         msg_set_type(&fragm_hdr, LAST_FRAGMENT);
2398                 }
2399                 fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
2400                 if (fragm == NULL) {
2401                         kfree_skb(buf);
2402                         while (buf_chain) {
2403                                 buf = buf_chain;
2404                                 buf_chain = buf_chain->next;
2405                                 kfree_skb(buf);
2406                         }
2407                         return -ENOMEM;
2408                 }
2409                 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2410                 fragm_no++;
2411                 msg_set_fragm_no(&fragm_hdr, fragm_no);
2412                 skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2413                 skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2414                                                fragm_sz);
2415                 buf_chain_tail->next = fragm;
2416                 buf_chain_tail = fragm;
2417
2418                 rest -= fragm_sz;
2419                 crs += fragm_sz;
2420                 msg_set_type(&fragm_hdr, FRAGMENT);
2421         }
2422         kfree_skb(buf);
2423
2424         /* Append chain of fragments to send queue & send them */
2425         l_ptr->long_msg_seq_no++;
2426         link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
2427         l_ptr->stats.sent_fragments += fragm_no;
2428         l_ptr->stats.sent_fragmented++;
2429         tipc_link_push_queue(l_ptr);
2430
2431         return dsz;
2432 }
2433
2434 /*
2435  * A pending message being re-assembled must store certain values
2436  * to handle subsequent fragments correctly. The following functions
2437  * help storing these values in unused, available fields in the
2438  * pending message. This makes dynamic memory allocation unnecessary.
2439  */
2440 static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
2441 {
2442         msg_set_seqno(buf_msg(buf), seqno);
2443 }
2444
2445 static u32 get_fragm_size(struct sk_buff *buf)
2446 {
2447         return msg_ack(buf_msg(buf));
2448 }
2449
2450 static void set_fragm_size(struct sk_buff *buf, u32 sz)
2451 {
2452         msg_set_ack(buf_msg(buf), sz);
2453 }
2454
2455 static u32 get_expected_frags(struct sk_buff *buf)
2456 {
2457         return msg_bcast_ack(buf_msg(buf));
2458 }
2459
2460 static void set_expected_frags(struct sk_buff *buf, u32 exp)
2461 {
2462         msg_set_bcast_ack(buf_msg(buf), exp);
2463 }
2464
2465 /*
2466  * tipc_link_recv_fragment(): Called with node lock on. Returns
2467  * the reassembled buffer if message is complete.
2468  */
2469 int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2470                             struct tipc_msg **m)
2471 {
2472         struct sk_buff *prev = NULL;
2473         struct sk_buff *fbuf = *fb;
2474         struct tipc_msg *fragm = buf_msg(fbuf);
2475         struct sk_buff *pbuf = *pending;
2476         u32 long_msg_seq_no = msg_long_msgno(fragm);
2477
2478         *fb = NULL;
2479
2480         /* Is there an incomplete message waiting for this fragment? */
2481         while (pbuf && ((buf_seqno(pbuf) != long_msg_seq_no) ||
2482                         (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
2483                 prev = pbuf;
2484                 pbuf = pbuf->next;
2485         }
2486
2487         if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
2488                 struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
2489                 u32 msg_sz = msg_size(imsg);
2490                 u32 fragm_sz = msg_data_sz(fragm);
2491                 u32 exp_fragm_cnt;
2492                 u32 max =  TIPC_MAX_USER_MSG_SIZE + NAMED_H_SIZE;
2493
2494                 if (msg_type(imsg) == TIPC_MCAST_MSG)
2495                         max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
2496                 if (fragm_sz == 0 || msg_size(imsg) > max) {
2497                         kfree_skb(fbuf);
2498                         return 0;
2499                 }
2500                 exp_fragm_cnt = msg_sz / fragm_sz + !!(msg_sz % fragm_sz);
2501                 pbuf = tipc_buf_acquire(msg_size(imsg));
2502                 if (pbuf != NULL) {
2503                         pbuf->next = *pending;
2504                         *pending = pbuf;
2505                         skb_copy_to_linear_data(pbuf, imsg,
2506                                                 msg_data_sz(fragm));
2507                         /*  Prepare buffer for subsequent fragments. */
2508                         set_long_msg_seqno(pbuf, long_msg_seq_no);
2509                         set_fragm_size(pbuf, fragm_sz);
2510                         set_expected_frags(pbuf, exp_fragm_cnt - 1);
2511                 } else {
2512                         pr_debug("Link unable to reassemble fragmented message\n");
2513                         kfree_skb(fbuf);
2514                         return -1;
2515                 }
2516                 kfree_skb(fbuf);
2517                 return 0;
2518         } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
2519                 u32 dsz = msg_data_sz(fragm);
2520                 u32 fsz = get_fragm_size(pbuf);
2521                 u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
2522                 u32 exp_frags = get_expected_frags(pbuf) - 1;
2523                 skb_copy_to_linear_data_offset(pbuf, crs,
2524                                                msg_data(fragm), dsz);
2525                 kfree_skb(fbuf);
2526
2527                 /* Is message complete? */
2528                 if (exp_frags == 0) {
2529                         if (prev)
2530                                 prev->next = pbuf->next;
2531                         else
2532                                 *pending = pbuf->next;
2533                         msg_reset_reroute_cnt(buf_msg(pbuf));
2534                         *fb = pbuf;
2535                         *m = buf_msg(pbuf);
2536                         return 1;
2537                 }
2538                 set_expected_frags(pbuf, exp_frags);
2539                 return 0;
2540         }
2541         kfree_skb(fbuf);
2542         return 0;
2543 }
2544
2545 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
2546 {
2547         if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
2548                 return;
2549
2550         l_ptr->tolerance = tolerance;
2551         l_ptr->continuity_interval =
2552                 ((tolerance / 4) > 500) ? 500 : tolerance / 4;
2553         l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
2554 }
2555
2556 void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
2557 {
2558         /* Data messages from this node, inclusive FIRST_FRAGM */
2559         l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
2560         l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
2561         l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
2562         l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
2563         /* Transiting data messages,inclusive FIRST_FRAGM */
2564         l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
2565         l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
2566         l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
2567         l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
2568         l_ptr->queue_limit[CONN_MANAGER] = 1200;
2569         l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
2570         l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
2571         /* FRAGMENT and LAST_FRAGMENT packets */
2572         l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
2573 }
2574
2575 /**
2576  * link_find_link - locate link by name
2577  * @name: ptr to link name string
2578  * @node: ptr to area to be filled with ptr to associated node
2579  *
2580  * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
2581  * this also prevents link deletion.
2582  *
2583  * Returns pointer to link (or 0 if invalid link name).
2584  */
2585 static struct tipc_link *link_find_link(const char *name,
2586                                         struct tipc_node **node)
2587 {
2588         struct tipc_link_name link_name_parts;
2589         struct tipc_bearer *b_ptr;
2590         struct tipc_link *l_ptr;
2591
2592         if (!link_name_validate(name, &link_name_parts))
2593                 return NULL;
2594
2595         b_ptr = tipc_bearer_find_interface(link_name_parts.if_local);
2596         if (!b_ptr)
2597                 return NULL;
2598
2599         *node = tipc_node_find(link_name_parts.addr_peer);
2600         if (!*node)
2601                 return NULL;
2602
2603         l_ptr = (*node)->links[b_ptr->identity];
2604         if (!l_ptr || strcmp(l_ptr->name, name))
2605                 return NULL;
2606
2607         return l_ptr;
2608 }
2609
2610 /**
2611  * link_value_is_valid -- validate proposed link tolerance/priority/window
2612  *
2613  * @cmd: value type (TIPC_CMD_SET_LINK_*)
2614  * @new_value: the new value
2615  *
2616  * Returns 1 if value is within range, 0 if not.
2617  */
2618 static int link_value_is_valid(u16 cmd, u32 new_value)
2619 {
2620         switch (cmd) {
2621         case TIPC_CMD_SET_LINK_TOL:
2622                 return (new_value >= TIPC_MIN_LINK_TOL) &&
2623                         (new_value <= TIPC_MAX_LINK_TOL);
2624         case TIPC_CMD_SET_LINK_PRI:
2625                 return (new_value <= TIPC_MAX_LINK_PRI);
2626         case TIPC_CMD_SET_LINK_WINDOW:
2627                 return (new_value >= TIPC_MIN_LINK_WIN) &&
2628                         (new_value <= TIPC_MAX_LINK_WIN);
2629         }
2630         return 0;
2631 }
2632
2633 /**
2634  * link_cmd_set_value - change priority/tolerance/window for link/bearer/media
2635  * @name: ptr to link, bearer, or media name
2636  * @new_value: new value of link, bearer, or media setting
2637  * @cmd: which link, bearer, or media attribute to set (TIPC_CMD_SET_LINK_*)
2638  *
2639  * Caller must hold 'tipc_net_lock' to ensure link/bearer/media is not deleted.
2640  *
2641  * Returns 0 if value updated and negative value on error.
2642  */
2643 static int link_cmd_set_value(const char *name, u32 new_value, u16 cmd)
2644 {
2645         struct tipc_node *node;
2646         struct tipc_link *l_ptr;
2647         struct tipc_bearer *b_ptr;
2648         struct tipc_media *m_ptr;
2649
2650         l_ptr = link_find_link(name, &node);
2651         if (l_ptr) {
2652                 /*
2653                  * acquire node lock for tipc_link_send_proto_msg().
2654                  * see "TIPC locking policy" in net.c.
2655                  */
2656                 tipc_node_lock(node);
2657                 switch (cmd) {
2658                 case TIPC_CMD_SET_LINK_TOL:
2659                         link_set_supervision_props(l_ptr, new_value);
2660                         tipc_link_send_proto_msg(l_ptr,
2661                                 STATE_MSG, 0, 0, new_value, 0, 0);
2662                         break;
2663                 case TIPC_CMD_SET_LINK_PRI:
2664                         l_ptr->priority = new_value;
2665                         tipc_link_send_proto_msg(l_ptr,
2666                                 STATE_MSG, 0, 0, 0, new_value, 0);
2667                         break;
2668                 case TIPC_CMD_SET_LINK_WINDOW:
2669                         tipc_link_set_queue_limits(l_ptr, new_value);
2670                         break;
2671                 }
2672                 tipc_node_unlock(node);
2673                 return 0;
2674         }
2675
2676         b_ptr = tipc_bearer_find(name);
2677         if (b_ptr) {
2678                 switch (cmd) {
2679                 case TIPC_CMD_SET_LINK_TOL:
2680                         b_ptr->tolerance = new_value;
2681                         return 0;
2682                 case TIPC_CMD_SET_LINK_PRI:
2683                         b_ptr->priority = new_value;
2684                         return 0;
2685                 case TIPC_CMD_SET_LINK_WINDOW:
2686                         b_ptr->window = new_value;
2687                         return 0;
2688                 }
2689                 return -EINVAL;
2690         }
2691
2692         m_ptr = tipc_media_find(name);
2693         if (!m_ptr)
2694                 return -ENODEV;
2695         switch (cmd) {
2696         case TIPC_CMD_SET_LINK_TOL:
2697                 m_ptr->tolerance = new_value;
2698                 return 0;
2699         case TIPC_CMD_SET_LINK_PRI:
2700                 m_ptr->priority = new_value;
2701                 return 0;
2702         case TIPC_CMD_SET_LINK_WINDOW:
2703                 m_ptr->window = new_value;
2704                 return 0;
2705         }
2706         return -EINVAL;
2707 }
2708
2709 struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
2710                                      u16 cmd)
2711 {
2712         struct tipc_link_config *args;
2713         u32 new_value;
2714         int res;
2715
2716         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2717                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2718
2719         args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2720         new_value = ntohl(args->value);
2721
2722         if (!link_value_is_valid(cmd, new_value))
2723                 return tipc_cfg_reply_error_string(
2724                         "cannot change, value invalid");
2725
2726         if (!strcmp(args->name, tipc_bclink_name)) {
2727                 if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2728                     (tipc_bclink_set_queue_limits(new_value) == 0))
2729                         return tipc_cfg_reply_none();
2730                 return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2731                                                    " (cannot change setting on broadcast link)");
2732         }
2733
2734         read_lock_bh(&tipc_net_lock);
2735         res = link_cmd_set_value(args->name, new_value, cmd);
2736         read_unlock_bh(&tipc_net_lock);
2737         if (res)
2738                 return tipc_cfg_reply_error_string("cannot change link setting");
2739
2740         return tipc_cfg_reply_none();
2741 }
2742
2743 /**
2744  * link_reset_statistics - reset link statistics
2745  * @l_ptr: pointer to link
2746  */
2747 static void link_reset_statistics(struct tipc_link *l_ptr)
2748 {
2749         memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
2750         l_ptr->stats.sent_info = l_ptr->next_out_no;
2751         l_ptr->stats.recv_info = l_ptr->next_in_no;
2752 }
2753
2754 struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
2755 {
2756         char *link_name;
2757         struct tipc_link *l_ptr;
2758         struct tipc_node *node;
2759
2760         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2761                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2762
2763         link_name = (char *)TLV_DATA(req_tlv_area);
2764         if (!strcmp(link_name, tipc_bclink_name)) {
2765                 if (tipc_bclink_reset_stats())
2766                         return tipc_cfg_reply_error_string("link not found");
2767                 return tipc_cfg_reply_none();
2768         }
2769
2770         read_lock_bh(&tipc_net_lock);
2771         l_ptr = link_find_link(link_name, &node);
2772         if (!l_ptr) {
2773                 read_unlock_bh(&tipc_net_lock);
2774                 return tipc_cfg_reply_error_string("link not found");
2775         }
2776
2777         tipc_node_lock(node);
2778         link_reset_statistics(l_ptr);
2779         tipc_node_unlock(node);
2780         read_unlock_bh(&tipc_net_lock);
2781         return tipc_cfg_reply_none();
2782 }
2783
2784 /**
2785  * percent - convert count to a percentage of total (rounding up or down)
2786  */
2787 static u32 percent(u32 count, u32 total)
2788 {
2789         return (count * 100 + (total / 2)) / total;
2790 }
2791
2792 /**
2793  * tipc_link_stats - print link statistics
2794  * @name: link name
2795  * @buf: print buffer area
2796  * @buf_size: size of print buffer area
2797  *
2798  * Returns length of print buffer data string (or 0 if error)
2799  */
2800 static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
2801 {
2802         struct tipc_link *l;
2803         struct tipc_stats *s;
2804         struct tipc_node *node;
2805         char *status;
2806         u32 profile_total = 0;
2807         int ret;
2808
2809         if (!strcmp(name, tipc_bclink_name))
2810                 return tipc_bclink_stats(buf, buf_size);
2811
2812         read_lock_bh(&tipc_net_lock);
2813         l = link_find_link(name, &node);
2814         if (!l) {
2815                 read_unlock_bh(&tipc_net_lock);
2816                 return 0;
2817         }
2818         tipc_node_lock(node);
2819         s = &l->stats;
2820
2821         if (tipc_link_is_active(l))
2822                 status = "ACTIVE";
2823         else if (tipc_link_is_up(l))
2824                 status = "STANDBY";
2825         else
2826                 status = "DEFUNCT";
2827
2828         ret = tipc_snprintf(buf, buf_size, "Link <%s>\n"
2829                             "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
2830                             "  Window:%u packets\n",
2831                             l->name, status, l->max_pkt, l->priority,
2832                             l->tolerance, l->queue_limit[0]);
2833
2834         ret += tipc_snprintf(buf + ret, buf_size - ret,
2835                              "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
2836                              l->next_in_no - s->recv_info, s->recv_fragments,
2837                              s->recv_fragmented, s->recv_bundles,
2838                              s->recv_bundled);
2839
2840         ret += tipc_snprintf(buf + ret, buf_size - ret,
2841                              "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
2842                              l->next_out_no - s->sent_info, s->sent_fragments,
2843                              s->sent_fragmented, s->sent_bundles,
2844                              s->sent_bundled);
2845
2846         profile_total = s->msg_length_counts;
2847         if (!profile_total)
2848                 profile_total = 1;
2849
2850         ret += tipc_snprintf(buf + ret, buf_size - ret,
2851                              "  TX profile sample:%u packets  average:%u octets\n"
2852                              "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
2853                              "-16384:%u%% -32768:%u%% -66000:%u%%\n",
2854                              s->msg_length_counts,
2855                              s->msg_lengths_total / profile_total,
2856                              percent(s->msg_length_profile[0], profile_total),
2857                              percent(s->msg_length_profile[1], profile_total),
2858                              percent(s->msg_length_profile[2], profile_total),
2859                              percent(s->msg_length_profile[3], profile_total),
2860                              percent(s->msg_length_profile[4], profile_total),
2861                              percent(s->msg_length_profile[5], profile_total),
2862                              percent(s->msg_length_profile[6], profile_total));
2863
2864         ret += tipc_snprintf(buf + ret, buf_size - ret,
2865                              "  RX states:%u probes:%u naks:%u defs:%u"
2866                              " dups:%u\n", s->recv_states, s->recv_probes,
2867                              s->recv_nacks, s->deferred_recv, s->duplicates);
2868
2869         ret += tipc_snprintf(buf + ret, buf_size - ret,
2870                              "  TX states:%u probes:%u naks:%u acks:%u"
2871                              " dups:%u\n", s->sent_states, s->sent_probes,
2872                              s->sent_nacks, s->sent_acks, s->retransmitted);
2873
2874         ret += tipc_snprintf(buf + ret, buf_size - ret,
2875                              "  Congestion link:%u  Send queue"
2876                              " max:%u avg:%u\n", s->link_congs,
2877                              s->max_queue_sz, s->queue_sz_counts ?
2878                              (s->accu_queue_sz / s->queue_sz_counts) : 0);
2879
2880         tipc_node_unlock(node);
2881         read_unlock_bh(&tipc_net_lock);
2882         return ret;
2883 }
2884
2885 struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
2886 {
2887         struct sk_buff *buf;
2888         struct tlv_desc *rep_tlv;
2889         int str_len;
2890         int pb_len;
2891         char *pb;
2892
2893         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2894                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2895
2896         buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
2897         if (!buf)
2898                 return NULL;
2899
2900         rep_tlv = (struct tlv_desc *)buf->data;
2901         pb = TLV_DATA(rep_tlv);
2902         pb_len = ULTRA_STRING_MAX_LEN;
2903         str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
2904                                   pb, pb_len);
2905         if (!str_len) {
2906                 kfree_skb(buf);
2907                 return tipc_cfg_reply_error_string("link not found");
2908         }
2909         str_len += 1;   /* for "\0" */
2910         skb_put(buf, TLV_SPACE(str_len));
2911         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
2912
2913         return buf;
2914 }
2915
2916 /**
2917  * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
2918  * @dest: network address of destination node
2919  * @selector: used to select from set of active links
2920  *
2921  * If no active link can be found, uses default maximum packet size.
2922  */
2923 u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
2924 {
2925         struct tipc_node *n_ptr;
2926         struct tipc_link *l_ptr;
2927         u32 res = MAX_PKT_DEFAULT;
2928
2929         if (dest == tipc_own_addr)
2930                 return MAX_MSG_SIZE;
2931
2932         read_lock_bh(&tipc_net_lock);
2933         n_ptr = tipc_node_find(dest);
2934         if (n_ptr) {
2935                 tipc_node_lock(n_ptr);
2936                 l_ptr = n_ptr->active_links[selector & 1];
2937                 if (l_ptr)
2938                         res = l_ptr->max_pkt;
2939                 tipc_node_unlock(n_ptr);
2940         }
2941         read_unlock_bh(&tipc_net_lock);
2942         return res;
2943 }
2944
2945 static void link_print(struct tipc_link *l_ptr, const char *str)
2946 {
2947         pr_info("%s Link %x<%s>:", str, l_ptr->addr, l_ptr->b_ptr->name);
2948
2949         if (link_working_unknown(l_ptr))
2950                 pr_cont(":WU\n");
2951         else if (link_reset_reset(l_ptr))
2952                 pr_cont(":RR\n");
2953         else if (link_reset_unknown(l_ptr))
2954                 pr_cont(":RU\n");
2955         else if (link_working_working(l_ptr))
2956                 pr_cont(":WW\n");
2957         else
2958                 pr_cont("\n");
2959 }