measure ping latency including qos_resume delay
[cor.git] / net / cor / kpacket_gen.c
blob24c1e8e211aacf1fea3bd72dc45e0777b8e42822
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2020 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <asm/byteorder.h>
23 #include "cor.h"
25 /* not sent over the network - internal meaning only */
26 #define MSGTYPE_PONG 1
27 #define MSGTYPE_ACK 2
28 #define MSGTYPE_ACK_CONN 3
29 #define MSGTYPE_CONNECT 4
30 #define MSGTYPE_CONNECT_SUCCESS 5
31 #define MSGTYPE_RESET_CONN 6
32 #define MSGTYPE_CONNDATA 7
33 #define MSGTYPE_SET_MAX_CMSG_DELAY 8
35 #define MSGTYPE_PONG_TIMEENQUEUED 1
36 #define MSGTYPE_PONG_RESPDELAY 2
38 struct control_msg_out{
39 __u8 type;
40 __u32 length;
41 struct kref ref;
42 struct neighbor *nb;
44 /* either queue or control_retrans_packet */
45 struct list_head lh;
47 unsigned long time_added;
49 union{
50 struct{
51 __u32 cookie;
52 __u8 type;
54 ktime_t time_enqueued;
55 }pong;
57 struct{
58 __u64 seqno;
59 }ack;
61 struct{
62 struct conn *src_in;
63 struct list_head conn_acks;
64 __u32 conn_id;
65 __u64 seqno;
66 __u64 seqno_ooo;
67 __u32 length;
69 __u8 priority_seqno;
70 __u8 priority;
72 __u8 flags;
74 __u32 ack_seqno;
75 }ack_conn;
77 struct{
78 __u32 conn_id;
79 __u64 seqno1;
80 __u64 seqno2;
81 struct conn *src_in;
82 }connect;
84 struct{
85 __u32 conn_id;
86 struct conn *src_in;
87 }connect_success;
89 struct{
90 struct rb_node rbn;
91 __u8 in_pending_conn_resets;
92 __u32 conn_id;
93 }reset_conn;
95 struct{
96 __u32 conn_id;
97 __u64 seqno;
98 __u32 datalen;
99 __u8 snd_delayed_lowbuf;
100 __u8 flush;
101 __u8 highlatency;
102 char *data_orig;
103 char *data;
104 struct conn_retrans *cr;
105 }conn_data;
107 struct{
108 __u32 ack_delay;
109 __u32 ackconn_delay;
110 __u32 other_delay;
111 }set_max_cmsg_delay;
112 }msg;
115 struct control_retrans {
116 struct kref ref;
118 struct neighbor *nb;
119 __u64 seqno;
121 unsigned long timeout;
123 struct list_head msgs;
125 struct rb_node rbn;
126 struct list_head timeout_list;
129 struct unknownconnid_matchparam {
130 struct neighbor *nb;
131 __u32 conn_id;
135 static struct kmem_cache *controlmsg_slab;
136 static struct kmem_cache *controlretrans_slab;
138 static atomic_t cmsg_othercnt = ATOMIC_INIT(0);
140 #define ADDCMSG_SRC_NEW 1
141 #define ADDCMSG_SRC_SPLITCONNDATA 2
142 #define ADDCMSG_SRC_READD 3
143 #define ADDCMSG_SRC_RETRANS 4
145 static void enqueue_control_msg(struct control_msg_out *msg, int src);
147 static void try_merge_ackconns(struct conn *src_in_l,
148 struct control_msg_out *cm);
150 static void merge_or_enqueue_ackconn(struct conn *src_in_l,
151 struct control_msg_out *cm, int src);
153 static struct control_msg_out *_alloc_control_msg(struct neighbor *nb)
155 struct control_msg_out *cm;
157 BUG_ON(nb == 0);
159 cm = kmem_cache_alloc(controlmsg_slab, GFP_ATOMIC);
160 if (unlikely(cm == 0))
161 return 0;
162 memset(cm, 0, sizeof(struct control_msg_out));
163 kref_init(&(cm->ref));
164 cm->nb = nb;
165 return cm;
168 static int calc_limit(int limit, int priority)
170 if (priority == ACM_PRIORITY_LOW)
171 return (limit+1)/2;
172 else if (priority == ACM_PRIORITY_MED)
173 return (limit * 3 + 1)/4;
174 else if (priority == ACM_PRIORITY_HIGH)
175 return limit;
176 else
177 BUG();
180 struct control_msg_out *alloc_control_msg(struct neighbor *nb, int priority)
182 struct control_msg_out *cm = 0;
184 long packets1;
185 long packets2;
187 BUG_ON(nb == 0);
189 packets1 = atomic_inc_return(&(nb->cmsg_othercnt));
190 packets2 = atomic_inc_return(&(cmsg_othercnt));
192 BUG_ON(packets1 <= 0);
193 BUG_ON(packets2 <= 0);
195 if (packets1 <= calc_limit(GUARANTEED_CMSGS_PER_NEIGH, priority))
196 goto alloc;
198 if (unlikely(unlikely(packets1 > calc_limit(MAX_CMSGS_PER_NEIGH,
199 priority)) ||
200 unlikely(packets2 > calc_limit(MAX_CMSGS, priority))))
201 goto full;
203 alloc:
204 cm = _alloc_control_msg(nb);
205 if (unlikely(cm == 0)) {
206 full:
208 /* printk(KERN_ERR "alloc_control_msg failed %ld %ld", packets1, packets2); */
209 atomic_dec(&(nb->cmsg_othercnt));
210 atomic_dec(&(cmsg_othercnt));
212 return cm;
215 static void cmsg_kref_free(struct kref *ref)
217 struct control_msg_out *cm = container_of(ref, struct control_msg_out,
218 ref);
219 kmem_cache_free(controlmsg_slab, cm);
222 void free_control_msg(struct control_msg_out *cm)
224 if (likely(cm->type != MSGTYPE_PONG)) {
225 atomic_dec(&(cm->nb->cmsg_othercnt));
226 atomic_dec(&(cmsg_othercnt));
229 if (cm->type == MSGTYPE_ACK_CONN) {
230 struct conn *trgt_out = cm->msg.ack_conn.src_in->reversedir;
231 BUG_ON(cm->msg.ack_conn.src_in == 0);
232 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) != 0){
233 spin_lock_bh(&(trgt_out->rcv_lock));
234 BUG_ON(trgt_out->targettype != TARGET_OUT);
235 if (trgt_out->target.out.priority_send_allowed != 0) {
236 trgt_out->target.out.priority_send_allowed = 1;
237 spin_unlock_bh(&(trgt_out->rcv_lock));
238 refresh_conn_priority(trgt_out, 0);
239 } else {
240 spin_unlock_bh(&(trgt_out->rcv_lock));
243 kref_put(&(cm->msg.ack_conn.src_in->ref), free_conn);
244 cm->msg.ack_conn.src_in = 0;
245 } else if (cm->type == MSGTYPE_CONNECT) {
246 BUG_ON(cm->msg.connect.src_in == 0);
247 kref_put(&(cm->msg.connect.src_in->ref), free_conn);
248 cm->msg.connect.src_in = 0;
249 } else if (cm->type == MSGTYPE_CONNECT_SUCCESS) {
250 BUG_ON(cm->msg.connect_success.src_in == 0);
251 kref_put(&(cm->msg.connect_success.src_in->ref), free_conn);
252 cm->msg.connect_success.src_in = 0;
253 } else if (cm->type == MSGTYPE_RESET_CONN) {
254 spin_lock_bh(&(cm->nb->cmsg_lock));
255 if (cm->msg.reset_conn.in_pending_conn_resets != 0) {
256 rb_erase(&(cm->msg.reset_conn.rbn),
257 &(cm->nb->pending_conn_resets_rb));
258 cm->msg.reset_conn.in_pending_conn_resets = 0;
260 kref_put(&(cm->ref), kreffree_bug);
262 spin_unlock_bh(&(cm->nb->cmsg_lock));
265 kref_put(&(cm->ref), cmsg_kref_free);
268 static void free_control_retrans(struct kref *ref)
270 struct control_retrans *cr = container_of(ref, struct control_retrans,
271 ref);
273 while (list_empty(&(cr->msgs)) == 0) {
274 struct control_msg_out *cm = container_of(cr->msgs.next,
275 struct control_msg_out, lh);
277 if (cm->type == MSGTYPE_PONG)
278 atomic_dec(&(cm->nb->cmsg_pongs_retrans_cnt));
280 list_del(&(cm->lh));
281 free_control_msg(cm);
284 kmem_cache_free(controlretrans_slab, cr);
287 struct control_retrans *get_control_retrans(struct neighbor *nb_retranslocked,
288 __u64 seqno)
290 struct rb_node *n = 0;
291 struct control_retrans *ret = 0;
293 n = nb_retranslocked->kp_retransmits_rb.rb_node;
295 while (likely(n != 0) && ret == 0) {
296 struct control_retrans *cr = container_of(n,
297 struct control_retrans, rbn);
299 BUG_ON(cr->nb != nb_retranslocked);
301 if (seqno_before(seqno, cr->seqno))
302 n = n->rb_left;
303 else if (seqno_after(seqno, cr->seqno))
304 n = n->rb_right;
305 else
306 ret = cr;
309 if (ret != 0)
310 kref_get(&(ret->ref));
312 return ret;
315 /* nb->retrans_lock must be held */
316 void insert_control_retrans(struct control_retrans *ins)
318 struct neighbor *nb = ins->nb;
319 __u64 seqno = ins->seqno;
321 struct rb_root *root;
322 struct rb_node **p;
323 struct rb_node *parent = 0;
325 BUG_ON(nb == 0);
327 root = &(nb->kp_retransmits_rb);
328 p = &(root->rb_node);
330 while ((*p) != 0) {
331 struct control_retrans *cr = container_of(*p,
332 struct control_retrans, rbn);
334 BUG_ON(cr->nb != nb);
336 parent = *p;
337 if (unlikely(seqno_eq(seqno, cr->seqno))) {
338 BUG();
339 } else if (seqno_before(seqno, cr->seqno)) {
340 p = &(*p)->rb_left;
341 } else if (seqno_after(seqno, cr->seqno)) {
342 p = &(*p)->rb_right;
343 } else {
344 BUG();
348 kref_get(&(ins->ref));
349 rb_link_node(&(ins->rbn), parent, p);
350 rb_insert_color(&(ins->rbn), root);
353 static void remove_connack_oooflag_ifold(struct conn *src_in_l,
354 struct control_msg_out *cm)
356 if (ooolen(cm->msg.ack_conn.flags) != 0 && seqno_before_eq(
357 cm->msg.ack_conn.seqno_ooo +
358 cm->msg.ack_conn.length,
359 src_in_l->source.in.next_seqno)) {
360 cm->msg.ack_conn.length = 0;
361 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
362 (~KP_ACK_CONN_FLAGS_OOO));
366 static int ackconn_prepare_requeue(struct conn *cn_l,
367 struct control_msg_out *cm)
369 if (unlikely(unlikely(cn_l->sourcetype != SOURCE_IN) ||
370 unlikely(cn_l->source.in.nb != cm->nb) ||
371 unlikely(cn_l->reversedir->target.out.conn_id !=
372 cm->msg.ack_conn.conn_id) ||
373 unlikely(cn_l->isreset != 0)))
374 return 0;
376 remove_connack_oooflag_ifold(cn_l, cm);
378 if (!seqno_eq(cm->msg.ack_conn.ack_seqno, cn_l->source.in.ack_seqno))
379 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
380 (~KP_ACK_CONN_FLAGS_SEQNO) &
381 (~KP_ACK_CONN_FLAGS_WINDOW));
383 if (cm->msg.ack_conn.flags == 0)
384 return 0;
386 cm->length = 6 + ack_conn_len(cm->msg.ack_conn.flags);
388 return 1;
391 static void requeue_control_retrans(struct control_retrans *cr)
393 atomic_inc(&(cr->nb->cmsg_bulk_readds));
395 while (list_empty(&(cr->msgs)) == 0) {
396 struct control_msg_out *cm = container_of(cr->msgs.prev,
397 struct control_msg_out, lh);
398 list_del(&(cm->lh));
400 BUG_ON(cm->nb != cr->nb);
402 if (cm->type == MSGTYPE_ACK_CONN) {
403 struct conn *cn_l = cm->msg.ack_conn.src_in;
404 spin_lock_bh(&(cn_l->rcv_lock));
405 if (unlikely(ackconn_prepare_requeue(cn_l, cm) == 0)) {
406 free_control_msg(cm);
407 } else {
408 merge_or_enqueue_ackconn(cn_l, cm,
409 ADDCMSG_SRC_RETRANS);
412 spin_unlock_bh(&(cn_l->rcv_lock));
413 } else {
414 if (cm->type == MSGTYPE_PONG)
415 atomic_dec(&(cm->nb->cmsg_pongs_retrans_cnt));
416 enqueue_control_msg(cm, ADDCMSG_SRC_RETRANS);
420 atomic_dec(&(cr->nb->cmsg_bulk_readds));
422 spin_lock_bh(&(cr->nb->cmsg_lock));
423 schedule_controlmsg_timer(cr->nb);
424 spin_unlock_bh(&(cr->nb->cmsg_lock));
427 static void empty_retrans_queue(struct neighbor *nb_retranslocked)
429 while (!list_empty(&(nb_retranslocked->retrans_list))) {
430 struct control_retrans *cr = container_of(
431 nb_retranslocked->retrans_list.next,
432 struct control_retrans, timeout_list);
434 BUG_ON(cr->nb != nb_retranslocked);
436 list_del(&(cr->timeout_list));
437 rb_erase(&(cr->rbn), &(nb_retranslocked->kp_retransmits_rb));
439 kref_put(&(cr->ref), kreffree_bug); /* rb */
440 kref_put(&(cr->ref), free_control_retrans); /* list */
444 void retransmit_timerfunc(struct timer_list *retrans_timer)
446 struct neighbor *nb = container_of(retrans_timer,
447 struct neighbor, retrans_timer);
448 int nbstate = get_neigh_state(nb);
449 struct control_retrans *cr = 0;
451 spin_lock_bh(&(nb->retrans_lock));
453 if (list_empty(&(nb->retrans_list))) {
454 spin_unlock_bh(&(nb->retrans_lock));
455 kref_put(&(nb->ref), neighbor_free);
456 return;
459 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
460 empty_retrans_queue(nb);
461 spin_unlock_bh(&(nb->retrans_lock));
462 kref_put(&(nb->ref), neighbor_free);
463 return;
466 cr = container_of(nb->retrans_list.next, struct control_retrans,
467 timeout_list);
469 BUG_ON(cr->nb != nb);
471 if (time_after(cr->timeout, jiffies)) {
472 int rc = mod_timer(&(nb->retrans_timer), cr->timeout);
473 spin_unlock_bh(&(nb->retrans_lock));
474 if (rc != 0)
475 kref_put(&(nb->ref), neighbor_free);
476 return;
479 spin_unlock_bh(&(nb->retrans_lock));
481 spin_lock_bh(&(nb->cmsg_lock));
482 nb->add_retrans_needed = 1;
483 schedule_controlmsg_timer(nb);
484 spin_unlock_bh(&(nb->cmsg_lock));
486 kref_put(&(nb->ref), neighbor_free);
489 static void schedule_retransmit(struct control_retrans *cr, struct neighbor *nb)
491 int first;
493 cr->timeout = calc_timeout(atomic_read(&(nb->latency_retrans_us)),
494 atomic_read(&(nb->latency_stddev_retrans_us)),
495 atomic_read(&(nb->max_remote_ack_delay_us)));
497 spin_lock_bh(&(nb->retrans_lock));
498 insert_control_retrans(cr);
499 first = list_empty(&(nb->retrans_list));
500 list_add_tail(&(cr->timeout_list), &(nb->retrans_list));
502 if (first) {
503 if (mod_timer(&(nb->retrans_timer), cr->timeout) == 0) {
504 kref_get(&(nb->ref));
508 spin_unlock_bh(&(nb->retrans_lock));
511 void kern_ack_rcvd(struct neighbor *nb, __u64 seqno)
513 struct control_retrans *cr = 0;
515 spin_lock_bh(&(nb->retrans_lock));
517 cr = get_control_retrans(nb, seqno);
519 if (cr == 0) {
520 /* char *seqno_p = (char *) &seqno;
521 seqno = cpu_to_be32(seqno);
522 printk(KERN_ERR "bogus/duplicate ack received %d %d %d %d",
523 seqno_p[0], seqno_p[1], seqno_p[2], seqno_p[3]);
525 goto out;
528 rb_erase(&(cr->rbn), &(nb->kp_retransmits_rb));
530 BUG_ON(cr->nb != nb);
532 list_del(&(cr->timeout_list));
534 out:
535 spin_unlock_bh(&(nb->retrans_lock));
537 if (cr != 0) {
538 kref_put(&(cr->ref), kreffree_bug); /* get_control_retrans */
539 kref_put(&(cr->ref), kreffree_bug); /* rb_erase */
540 kref_put(&(cr->ref), free_control_retrans); /* list */
544 static __u8 get_window(struct conn *cn, struct neighbor *expectedsender,
545 __u32 expected_connid)
547 __u8 window = 0;
549 spin_lock_bh(&(cn->rcv_lock));
551 if (unlikely(unlikely(cn->sourcetype != SOURCE_IN) ||
552 unlikely(expectedsender != 0 && (cn->source.in.nb !=
553 expectedsender || cn->reversedir->target.out.conn_id !=
554 expected_connid))))
555 goto out;
557 window = enc_log_64_7(seqno_clean(cn->source.in.window_seqnolimit -
558 cn->source.in.next_seqno));
560 cn->source.in.window_seqnolimit_remote = cn->source.in.next_seqno +
561 dec_log_64_7(window);
563 out:
564 spin_unlock_bh(&(cn->rcv_lock));
566 return window;
569 /* static void padding(struct sk_buff *skb, __u32 length)
571 char *dst;
572 if (length <= 0)
573 return;
574 dst = skb_put(skb, length);
575 BUG_ON(dst == 0);
576 memset(dst, KP_PADDING, length);
577 } */
580 static int add_init_session(struct sk_buff *skb, __be32 sessionid,
581 __u32 spaceleft)
583 char *dst;
585 BUG_ON(KP_INIT_SESSION_CMDLEN != 5);
587 if (unlikely(spaceleft < 5))
588 return 0;
590 dst = skb_put(skb, 5);
591 BUG_ON(dst == 0);
593 dst[0] = KP_INIT_SESSION;
594 put_be32(dst + 1, sessionid);
596 return 5;
599 static int add_ack(struct sk_buff *skb, struct control_retrans *cr,
600 struct control_msg_out *cm, __u32 spaceleft)
602 char *dst;
604 if (unlikely(spaceleft < 7))
605 return 0;
607 dst = skb_put(skb, 7);
608 BUG_ON(dst == 0);
610 dst[0] = KP_ACK;
611 put_u48(dst + 1, cm->msg.ack.seqno);
613 free_control_msg(cm);
615 return 7;
618 static int add_ack_conn(struct sk_buff *skb, struct control_retrans *cr,
619 struct control_msg_out *cm, __u32 spaceleft)
621 char *dst;
622 int offset = 0;
624 if (unlikely(spaceleft < cm->length))
625 return 0;
627 dst = skb_put(skb, cm->length);
628 BUG_ON(dst == 0);
630 dst[offset] = KP_ACK_CONN;
631 offset++;
632 put_u32(dst + offset, cm->msg.ack_conn.conn_id);
633 offset += 4;
634 dst[offset] = cm->msg.ack_conn.flags;
635 offset++;
637 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) != 0) {
638 put_u48(dst + offset, cm->msg.ack_conn.seqno);
639 offset += 6;
641 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_WINDOW) != 0) {
642 BUG_ON(cm->msg.ack_conn.src_in == 0);
643 dst[offset] = get_window(cm->msg.ack_conn.src_in,
644 cm->nb, cm->msg.ack_conn.conn_id);
645 offset++;
649 if (ooolen(cm->msg.ack_conn.flags) != 0) {
650 put_u48(dst + offset, cm->msg.ack_conn.seqno_ooo);
651 offset += 6;
652 if (ooolen(cm->msg.ack_conn.flags) == 1) {
653 BUG_ON(cm->msg.ack_conn.length > 255);
654 dst[offset] = cm->msg.ack_conn.length;
655 offset += 1;
656 } else if (ooolen(cm->msg.ack_conn.flags) == 2) {
657 BUG_ON(cm->msg.ack_conn.length <= 255);
658 BUG_ON(cm->msg.ack_conn.length > 65535);
659 put_u16(dst + offset, cm->msg.ack_conn.length);
660 offset += 2;
661 } else if (ooolen(cm->msg.ack_conn.flags) == 4) {
662 BUG_ON(cm->msg.ack_conn.length <= 65535);
663 put_u32(dst + offset, cm->msg.ack_conn.length);
664 offset += 4;
665 } else {
666 BUG();
670 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) != 0) {
671 dst[offset] = cm->msg.ack_conn.priority_seqno;
672 offset++;
673 dst[offset] = cm->msg.ack_conn.priority;
674 offset++;
677 list_add_tail(&(cm->lh), &(cr->msgs));
679 BUG_ON(offset != cm->length);
680 return offset;
683 static int add_ping(struct sk_buff *skb, __u32 cookie, __u32 spaceleft)
685 char *dst;
687 BUG_ON(KP_PING_CMDLEN != 5);
689 if (unlikely(spaceleft < 5))
690 return 0;
692 dst = skb_put(skb, 5);
693 BUG_ON(dst == 0);
695 dst[0] = KP_PING;
696 put_u32(dst + 1, cookie);
698 return 5;
701 static int add_pong(struct sk_buff *skb, struct control_retrans *cr,
702 struct control_msg_out *cm, __u32 spaceleft,
703 ktime_t cmsg_send_start)
705 __s64 respdelay;
706 char *dst;
708 if (unlikely(spaceleft < 9))
709 return 0;
711 #warning todo queue delay/total delay
712 if (ktime_before(cmsg_send_start, cm->msg.pong.time_enqueued))
713 respdelay = 0;
714 else
715 respdelay = div_u64(ktime_to_ns(cmsg_send_start) -
716 ktime_to_ns(cm->msg.pong.time_enqueued) + 500,
717 1000);
718 if (unlikely(respdelay > U32_MAX))
719 respdelay = U32_MAX;
720 if (unlikely(respdelay < 0))
721 respdelay = 0;
723 dst = skb_put(skb, 9);
724 BUG_ON(dst == 0);
726 dst[0] = KP_PONG;
727 put_u32(dst + 1, cm->msg.pong.cookie);
728 put_u32(dst + 5, (__u32) respdelay);
730 list_add_tail(&(cm->lh), &(cr->msgs));
732 return 9;
735 static int add_connect(struct sk_buff *skb, struct control_retrans *cr,
736 struct control_msg_out *cm, __u32 spaceleft)
738 char *dst;
739 struct conn *src_in = cm->msg.connect.src_in;
741 if (unlikely(spaceleft < 20))
742 return 0;
744 dst = skb_put(skb, 20);
745 BUG_ON(dst == 0);
747 dst[0] = KP_CONNECT;
748 put_u32(dst + 1, cm->msg.connect.conn_id);
749 put_u48(dst + 5, cm->msg.connect.seqno1);
750 put_u48(dst + 11, cm->msg.connect.seqno2);
751 BUG_ON(cm->msg.connect.src_in == 0);
752 dst[17] = get_window(cm->msg.connect.src_in, cm->nb,
753 cm->msg.connect.conn_id);
755 spin_lock_bh(&(src_in->reversedir->rcv_lock));
756 BUG_ON(src_in->reversedir->targettype != TARGET_OUT);
758 dst[18] = src_in->reversedir->target.out.priority_seqno;
759 dst[19] = src_in->reversedir->target.out.priority_last;
761 spin_unlock_bh(&(src_in->reversedir->rcv_lock));
763 list_add_tail(&(cm->lh), &(cr->msgs));
765 return 20;
768 static int add_connect_success(struct sk_buff *skb, struct control_retrans *cr,
769 struct control_msg_out *cm, __u32 spaceleft)
771 char *dst;
773 if (unlikely(spaceleft < 6))
774 return 0;
776 dst = skb_put(skb, 6);
777 BUG_ON(dst == 0);
779 dst[0] = KP_CONNECT_SUCCESS;
780 put_u32(dst + 1, cm->msg.connect_success.conn_id);
781 BUG_ON(cm->msg.connect_success.src_in == 0);
782 dst[5] = get_window(cm->msg.connect_success.src_in, cm->nb,
783 cm->msg.connect_success.conn_id);
785 list_add_tail(&(cm->lh), &(cr->msgs));
787 return 6;
790 static int add_reset_conn(struct sk_buff *skb, struct control_retrans *cr,
791 struct control_msg_out *cm, __u32 spaceleft)
793 char *dst;
795 if (unlikely(spaceleft < 5))
796 return 0;
798 dst = skb_put(skb, 5);
799 BUG_ON(dst == 0);
801 dst[0] = KP_RESET_CONN;
802 put_u32(dst + 1, cm->msg.reset_conn.conn_id);
804 list_add_tail(&(cm->lh), &(cr->msgs));
806 return 5;
809 static int add_conndata(struct sk_buff *skb, struct control_retrans *cr,
810 struct control_msg_out *cm, __u32 spaceleft,
811 struct control_msg_out **split_conndata, __u32 *sc_sendlen)
813 char *dst;
815 __u32 totallen = cm->msg.conn_data.datalen + KP_CONN_DATA_CMDLEN;
816 __u32 putlen = min(totallen, spaceleft);
817 __u32 dataputlen = putlen - KP_CONN_DATA_CMDLEN;
819 BUG_ON(KP_CONN_DATA_CMDLEN != 13);
821 BUG_ON(putlen > 1024*1024*1024);
823 BUG_ON(split_conndata == 0);
824 BUG_ON(*split_conndata != 0);
825 BUG_ON(sc_sendlen == 0);
826 BUG_ON(*sc_sendlen != 0);
828 if (putlen < KP_CONN_DATA_CMDLEN + 1)
829 return 0;
831 dst = skb_put(skb, putlen);
832 BUG_ON(dst == 0);
834 if (cm->msg.conn_data.flush != 0) {
835 if (cm->msg.conn_data.snd_delayed_lowbuf == 0) {
836 dst[0] = KP_CONN_DATA_FLUSH;
837 } else {
838 dst[0] = KP_CONN_DATA_LOWBUFDELAYED_FLUSH;
840 } else {
841 if (cm->msg.conn_data.snd_delayed_lowbuf == 0) {
842 dst[0] = KP_CONN_DATA;
843 } else {
844 dst[0] = KP_CONN_DATA_LOWBUFDELAYED;
847 put_u32(dst + 1, cm->msg.conn_data.conn_id);
848 put_u48(dst + 5, cm->msg.conn_data.seqno);
849 put_u16(dst + 11, dataputlen);
851 memcpy(dst + 13, cm->msg.conn_data.data, dataputlen);
853 if (cm->msg.conn_data.datalen == dataputlen) {
854 list_add_tail(&(cm->lh), &(cr->msgs));
855 } else {
856 *split_conndata = cm;
857 *sc_sendlen = dataputlen;
860 return putlen;
863 static int add_set_max_cmsg_dly(struct sk_buff *skb, struct control_retrans *cr,
864 struct control_msg_out *cm, __u32 spaceleft)
866 char *dst;
868 BUG_ON(KP_SET_MAX_CMSG_DELAY_CMDLEN != 13);
870 if (unlikely(spaceleft < 13))
871 return 0;
873 dst = skb_put(skb, 13);
874 BUG_ON(dst == 0);
876 dst[0] = KP_SET_MAX_CMSG_DELAY;
877 put_u32(dst + 1, cm->msg.set_max_cmsg_delay.ack_delay);
878 put_u32(dst + 5, cm->msg.set_max_cmsg_delay.ackconn_delay);
879 put_u32(dst + 9, cm->msg.set_max_cmsg_delay.other_delay);
881 list_add_tail(&(cm->lh), &(cr->msgs));
883 return 13;
886 static int add_message(struct sk_buff *skb, struct control_retrans *cr,
887 struct control_msg_out *cm, __u32 spaceleft,
888 ktime_t cmsg_send_start,
889 struct control_msg_out **split_conndata, __u32 *sc_sendlen)
891 BUG_ON(split_conndata != 0 && *split_conndata != 0);
892 BUG_ON(sc_sendlen != 0 && *sc_sendlen != 0);
894 switch (cm->type) {
895 case MSGTYPE_ACK:
896 return add_ack(skb, cr, cm, spaceleft);
897 case MSGTYPE_ACK_CONN:
898 return add_ack_conn(skb, cr, cm, spaceleft);
899 case MSGTYPE_PONG:
900 return add_pong(skb, cr, cm, spaceleft, cmsg_send_start);
901 case MSGTYPE_CONNECT:
902 return add_connect(skb, cr, cm, spaceleft);
903 case MSGTYPE_CONNECT_SUCCESS:
904 return add_connect_success(skb, cr, cm, spaceleft);
905 case MSGTYPE_RESET_CONN:
906 return add_reset_conn(skb, cr, cm, spaceleft);
907 case MSGTYPE_CONNDATA:
908 return add_conndata(skb, cr, cm, spaceleft, split_conndata,
909 sc_sendlen);
910 case MSGTYPE_SET_MAX_CMSG_DELAY:
911 return add_set_max_cmsg_dly(skb, cr, cm, spaceleft);
912 default:
913 BUG();
915 BUG();
916 return 0;
919 static __u32 __send_messages(struct neighbor *nb, struct sk_buff *skb,
920 struct control_retrans *cr, struct list_head *cmsgs,
921 __u32 spaceleft, int nbstate, ktime_t cmsg_send_start,
922 struct control_msg_out **split_conndata, __u32 *sc_sendlen)
924 __u32 length = 0;
925 while (!list_empty(cmsgs)) {
926 int rc;
927 struct control_msg_out *cm = container_of(cmsgs->next,
928 struct control_msg_out, lh);
930 list_del(&(cm->lh));
932 rc = add_message(skb, cr, cm, spaceleft - length,
933 cmsg_send_start, split_conndata, sc_sendlen);
934 if (rc == 0) {
935 BUG();
936 list_add(&(cm->lh), cmsgs);
937 break;
940 length += rc;
943 return length;
946 static __u32 __send_messages_smcd(struct neighbor *nb, struct sk_buff *skb,
947 struct control_retrans *cr, __u32 spaceleft,
948 ktime_t cmsg_send_start)
950 struct control_msg_out *cm;
951 int rc;
953 cm = alloc_control_msg(nb, ACM_PRIORITY_MED);
955 if (unlikely(cm == 0))
956 return 0;
958 cm->type = MSGTYPE_SET_MAX_CMSG_DELAY;
959 cm->msg.set_max_cmsg_delay.ack_delay =
960 CMSG_MAXDELAY_ACK_MS * 1000;
961 cm->msg.set_max_cmsg_delay.ackconn_delay =
962 CMSG_MAXDELAY_ACKCONN_MS * 1000;
963 cm->msg.set_max_cmsg_delay.other_delay =
964 CMSG_MAXDELAY_OTHER_MS * 1000;
965 cm->length = KP_SET_MAX_CMSG_DELAY_CMDLEN;
967 rc = add_message(skb, cr, cm, spaceleft, cmsg_send_start, 0, 0);
969 nb->max_cmsg_delay_sent = 1;
971 return rc;
974 static void requeue_message(struct control_msg_out *cm)
976 if (cm->type == MSGTYPE_ACK_CONN) {
977 struct conn *cn_l = cm->msg.ack_conn.src_in;
979 spin_lock_bh(&(cn_l->rcv_lock));
980 if (unlikely(ackconn_prepare_requeue(cn_l, cm) == 0)) {
981 free_control_msg(cm);
982 } else {
983 spin_lock_bh(&(cm->nb->cmsg_lock));
985 list_add(&(cm->lh), &(cm->nb->cmsg_queue_ackconn));
986 cm->nb->cmsg_otherlength += cm->length;
988 list_add(&(cm->msg.ack_conn.conn_acks),
989 &(cn_l->source.in.acks_pending));
990 try_merge_ackconns(cn_l, cm);
992 spin_unlock_bh(&(cm->nb->cmsg_lock));
994 spin_unlock_bh(&(cn_l->rcv_lock));
995 return;
998 enqueue_control_msg(cm, ADDCMSG_SRC_READD);
1001 static void requeue_messages(struct list_head *lh)
1003 while (list_empty(lh) == 0) {
1004 struct control_msg_out *cm = container_of(lh->prev,
1005 struct control_msg_out, lh);
1006 list_del(&(cm->lh));
1007 requeue_message(cm);
1011 static int _send_messages_send2(struct neighbor *nb, struct sk_buff *skb,
1012 int ping, int initsession, struct control_retrans *cr,
1013 struct list_head *cmsgs, __u32 spaceleft, int nbstate,
1014 ktime_t cmsg_send_start, int *sent)
1016 int rc;
1017 __u32 length = 0;
1018 __u32 pinglen = 0;
1019 __u32 pingcookie = 0;
1020 unsigned long last_ping_time;
1021 struct control_msg_out *split_conndata = 0;
1022 __u32 sc_sendlen = 0;
1024 if (ping != TIMETOSENDPING_NO) {
1025 int rc;
1027 if (unlikely(initsession)) {
1028 rc = add_init_session(skb, nb->sessionid,
1029 spaceleft - length);
1030 BUG_ON(rc <= 0);
1031 pinglen = rc;
1032 length += rc;
1035 pingcookie = add_ping_req(nb, &last_ping_time);
1036 rc = add_ping(skb, pingcookie, spaceleft - length);
1037 BUG_ON(rc <= 0);
1038 pinglen += rc;
1039 length += rc;
1042 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE) &&
1043 unlikely(nb->max_cmsg_delay_sent == 0))
1044 length += __send_messages_smcd(nb, skb, cr, spaceleft - length,
1045 cmsg_send_start);
1047 length += __send_messages(nb, skb, cr, cmsgs, spaceleft - length,
1048 nbstate, cmsg_send_start, &split_conndata, &sc_sendlen);
1050 BUG_ON(length > spaceleft);
1052 if (likely(ping != TIMETOSENDPING_FORCE) &&
1053 pinglen != 0 && unlikely(length == pinglen)) {
1054 unadd_ping_req(nb, pingcookie, last_ping_time, 0);
1055 goto drop;
1058 if (unlikely(length == 0)) {
1059 drop:
1060 kfree_skb(skb);
1062 BUG_ON(list_empty(&(cr->msgs)) == 0);
1063 kref_put(&(cr->ref), free_control_retrans);
1065 nb->kpacket_seqno--;
1066 return QOS_RESUME_DONE;
1069 //padding(skb, spaceleft - length);
1070 BUG_ON(spaceleft - length != 0);
1072 rc = cor_dev_queue_xmit(skb, nb->queue, QOS_CALLER_KPACKET);
1073 if (rc == NET_XMIT_SUCCESS)
1074 *sent = 1;
1076 if (rc == NET_XMIT_DROP) {
1077 if (ping != 0)
1078 unadd_ping_req(nb, pingcookie, last_ping_time, 1);
1080 atomic_inc(&(nb->cmsg_bulk_readds));
1081 if (split_conndata != 0)
1082 requeue_message(split_conndata);
1084 requeue_messages(&(cr->msgs));
1086 kref_put(&(cr->ref), free_control_retrans);
1088 atomic_dec(&(nb->cmsg_bulk_readds));
1090 spin_lock_bh(&(nb->cmsg_lock));
1091 schedule_controlmsg_timer(nb);
1092 spin_unlock_bh(&(nb->cmsg_lock));
1093 } else {
1094 struct list_head *curr = cr->msgs.next;
1096 if (pingcookie != 0)
1097 ping_sent(nb, pingcookie);
1099 while (curr != &(cr->msgs)) {
1100 struct control_msg_out *cm = container_of(curr,
1101 struct control_msg_out, lh);
1103 curr = curr->next;
1105 if (unlikely(cm->type == MSGTYPE_PONG &&
1106 (nbstate != NEIGHBOR_STATE_ACTIVE))) {
1107 list_del(&(cm->lh));
1108 free_control_msg(cm);
1109 } else if (unlikely(cm->type == MSGTYPE_PONG &&
1110 atomic_inc_return(
1111 &(nb->cmsg_pongs_retrans_cnt)) >
1112 MAX_PONG_CMSGS_RETRANS_PER_NEIGH)) {
1113 atomic_dec(&(nb->cmsg_pongs_retrans_cnt));
1114 list_del(&(cm->lh));
1115 free_control_msg(cm);
1116 } else if (cm->type == MSGTYPE_CONNDATA) {
1117 schedule_retransmit_conn(cm->msg.conn_data.cr,
1118 0, 0);
1119 kfree(cm->msg.conn_data.data_orig);
1120 list_del(&(cm->lh));
1121 free_control_msg(cm);
1125 if (split_conndata != 0) {
1126 BUG_ON(sc_sendlen == 0);
1127 BUG_ON(sc_sendlen >=
1128 split_conndata->msg.conn_data.datalen);
1130 split_conndata->msg.conn_data.seqno += sc_sendlen;
1131 split_conndata->msg.conn_data.data += sc_sendlen;
1132 split_conndata->msg.conn_data.datalen -= sc_sendlen;
1133 split_conndata->length = KP_CONN_DATA_CMDLEN +
1134 split_conndata->msg.conn_data.datalen;
1135 enqueue_control_msg(split_conndata,
1136 ADDCMSG_SRC_SPLITCONNDATA);
1140 if (list_empty(&(cr->msgs)))
1141 kref_put(&(cr->ref), free_control_retrans);
1142 else
1143 schedule_retransmit(cr, nb);
1146 return (rc == NET_XMIT_SUCCESS) ? QOS_RESUME_DONE : QOS_RESUME_CONG;
1149 static int _send_messages_send(struct neighbor *nb, int ping,
1150 int initsession, struct list_head *cmsgs, int nbstate,
1151 __u32 length, __u64 seqno, ktime_t cmsg_send_start, int *sent)
1153 struct sk_buff *skb;
1154 struct control_retrans *cr;
1155 int rc;
1157 skb = create_packet_cmsg(nb, length, GFP_ATOMIC, seqno);
1158 if (unlikely(skb == 0)) {
1159 printk(KERN_ERR "cor: send_messages: cannot allocate skb (out of memory?)");
1161 requeue_messages(cmsgs);
1162 return QOS_RESUME_CONG;
1165 cr = kmem_cache_alloc(controlretrans_slab, GFP_ATOMIC);
1166 if (unlikely(cr == 0)) {
1167 printk(KERN_ERR "cor: send_messages: cannot allocate control_retrans (out of memory?)");
1168 kfree_skb(skb);
1170 requeue_messages(cmsgs);
1171 return QOS_RESUME_CONG;
1174 memset(cr, 0, sizeof(struct control_retrans));
1175 kref_init(&(cr->ref));
1176 cr->nb = nb;
1177 cr->seqno = seqno;
1178 INIT_LIST_HEAD(&(cr->msgs));
1180 rc = _send_messages_send2(nb, skb, ping, initsession, cr, cmsgs, length,
1181 nbstate, cmsg_send_start, sent);
1183 BUG_ON(!list_empty(cmsgs));
1185 return rc;
1188 #define CMSGQUEUE_PONG 1
1189 #define CMSGQUEUE_ACK 2
1190 #define CMSGQUEUE_ACK_CONN 3
1191 #define CMSGQUEUE_CONNDATA_LOWLAT 4
1192 #define CMSGQUEUE_CONNDATA_HIGHLAT 5
1193 #define CMSGQUEUE_OTHER 6
1195 static unsigned long get_cmsg_timeout(struct control_msg_out *cm, int queue)
1197 if (cm->type == MSGTYPE_ACK) {
1198 BUG_ON(queue != CMSGQUEUE_ACK);
1199 return cm->time_added +
1200 msecs_to_jiffies(CMSG_MAXDELAY_ACK_MS) - 1;
1201 } else if (cm->type == MSGTYPE_ACK_CONN) {
1202 BUG_ON(queue != CMSGQUEUE_ACK_CONN);
1203 return cm->time_added +
1204 msecs_to_jiffies(CMSG_MAXDELAY_ACKCONN_MS) - 1;
1205 } else if (cm->type == MSGTYPE_CONNDATA) {
1206 if (cm->msg.conn_data.highlatency != 0) {
1207 BUG_ON(queue != CMSGQUEUE_CONNDATA_HIGHLAT);
1208 return cm->time_added +
1209 msecs_to_jiffies(
1210 CMSG_MAXDELAY_CONNDATA_MS) - 1;
1211 } else {
1212 BUG_ON(queue != CMSGQUEUE_CONNDATA_LOWLAT);
1213 return cm->time_added;
1215 } else {
1216 BUG_ON(cm->type == MSGTYPE_PONG && queue != CMSGQUEUE_PONG);
1217 BUG_ON(cm->type != MSGTYPE_PONG && queue != CMSGQUEUE_OTHER);
1219 return cm->time_added +
1220 msecs_to_jiffies(CMSG_MAXDELAY_OTHER_MS) - 1;
1224 static void _peek_message(struct neighbor *nb_cmsglocked, int queue,
1225 struct control_msg_out **currcm, unsigned long *currtimeout,
1226 __u32 **currlen)
1228 struct control_msg_out *cm;
1229 unsigned long cmtimeout;
1231 struct list_head *queuelh;
1232 if (queue == CMSGQUEUE_PONG) {
1233 queuelh = &(nb_cmsglocked->cmsg_queue_pong);
1234 } else if (queue == CMSGQUEUE_ACK) {
1235 queuelh = &(nb_cmsglocked->cmsg_queue_ack);
1236 } else if (queue == CMSGQUEUE_ACK_CONN) {
1237 queuelh = &(nb_cmsglocked->cmsg_queue_ackconn);
1238 } else if (queue == CMSGQUEUE_CONNDATA_LOWLAT) {
1239 queuelh = &(nb_cmsglocked->cmsg_queue_conndata_lowlat);
1240 } else if (queue == CMSGQUEUE_CONNDATA_HIGHLAT) {
1241 queuelh = &(nb_cmsglocked->cmsg_queue_conndata_highlat);
1242 } else if (queue == CMSGQUEUE_OTHER) {
1243 queuelh = &(nb_cmsglocked->cmsg_queue_other);
1244 } else {
1245 BUG();
1248 if (list_empty(queuelh))
1249 return;
1251 cm = container_of(queuelh->next, struct control_msg_out, lh);
1252 cmtimeout = get_cmsg_timeout(cm, queue);
1254 BUG_ON(cm->nb != nb_cmsglocked);
1256 if (*currcm == 0 || (time_before(cmtimeout, *currtimeout) &&
1257 time_before(jiffies, *currtimeout))) {
1258 *currcm = cm;
1259 *currtimeout = cmtimeout;
1261 if (queue == CMSGQUEUE_PONG) {
1262 *currlen = &(nb_cmsglocked->cmsg_pongslength);
1263 } else {
1264 *currlen = &(nb_cmsglocked->cmsg_otherlength);
1269 static void peek_message(struct neighbor *nb_cmsglocked, int nbstate,
1270 struct control_msg_out **cm, unsigned long *cmtimeout,
1271 __u32 **len, int for_timeout)
1273 _peek_message(nb_cmsglocked, CMSGQUEUE_PONG, cm, cmtimeout, len);
1274 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE)) {
1275 _peek_message(nb_cmsglocked, CMSGQUEUE_ACK, cm, cmtimeout, len);
1276 _peek_message(nb_cmsglocked, CMSGQUEUE_ACK_CONN, cm, cmtimeout,
1277 len);
1278 if (!for_timeout || atomic_read(
1279 &(nb_cmsglocked->cmsg_delay_conndata)) == 0) {
1280 _peek_message(nb_cmsglocked, CMSGQUEUE_CONNDATA_LOWLAT,
1281 cm, cmtimeout, len);
1282 _peek_message(nb_cmsglocked, CMSGQUEUE_CONNDATA_HIGHLAT,
1283 cm, cmtimeout, len);
1285 _peek_message(nb_cmsglocked, CMSGQUEUE_OTHER, cm, cmtimeout, len);
1289 static unsigned long get_cmsg_timer_timeout(struct neighbor *nb_cmsglocked,
1290 int nbstate)
1292 unsigned long pingtimeout = get_next_ping_time(nb_cmsglocked);
1294 struct control_msg_out *cm = 0;
1295 unsigned long cmtimeout;
1296 __u32 *len;
1298 peek_message(nb_cmsglocked, nbstate, &cm, &cmtimeout, &len, 1);
1300 if (cm != 0) {
1301 unsigned long jiffies_tmp = jiffies;
1303 if (time_before(cmtimeout, jiffies_tmp))
1304 return jiffies_tmp;
1305 if (time_before(cmtimeout, pingtimeout))
1306 return cmtimeout;
1309 return pingtimeout;
1312 static void _dequeue_messages(struct neighbor *nb_cmsglocked, int nbstate,
1313 __u32 targetmss, __u32 *length, struct list_head *cmsgs)
1315 while (1) {
1316 __u32 spaceleft = targetmss - *length;
1317 struct control_msg_out *cm = 0;
1318 unsigned long cmtimeout;
1319 __u32 *len;
1321 peek_message(nb_cmsglocked, nbstate, &cm, &cmtimeout, &len, 0);
1323 if (unlikely(cm == 0))
1324 break;
1326 BUG_ON(len == 0);
1328 if (cm->length > spaceleft) {
1329 BUG_ON(*length == 0 && cm->type != MSGTYPE_CONNDATA);
1330 BUG_ON(*length == 0 && cm->type == MSGTYPE_CONNDATA &&
1331 spaceleft < KP_CONN_DATA_CMDLEN + 1);
1333 if ((*length/4)*3 > targetmss)
1334 break;
1337 list_del(&(cm->lh));
1338 *len -= cm->length;
1340 if (cm->type == MSGTYPE_ACK_CONN)
1341 list_del(&(cm->msg.ack_conn.conn_acks));
1342 if (unlikely(cm->type == MSGTYPE_PONG)) {
1343 BUG_ON(cm->nb->cmsg_pongscnt == 0);
1344 cm->nb->cmsg_pongscnt--;
1347 if (unlikely(cm->type == MSGTYPE_RESET_CONN)) {
1348 BUG_ON(cm->msg.reset_conn.in_pending_conn_resets == 0);
1349 rb_erase(&(cm->msg.reset_conn.rbn),
1350 &(cm->nb->pending_conn_resets_rb));
1351 cm->msg.reset_conn.in_pending_conn_resets = 0;
1352 kref_put(&(cm->ref), kreffree_bug);
1355 BUG_ON(*length + cm->length < *length);
1356 if (cm->length > targetmss - *length) {
1357 BUG_ON(cm->type != MSGTYPE_CONNDATA);
1358 *length = targetmss;
1359 } else {
1360 *length += cm->length;
1363 list_add_tail(&(cm->lh), cmsgs);
1367 static __u32 get_total_messages_length(struct neighbor *nb, int ping,
1368 int initsession, int nbstate, int *extralength)
1370 __u32 length = nb->cmsg_pongslength;
1372 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE)) {
1373 length += nb->cmsg_otherlength;
1375 if (unlikely(nb->max_cmsg_delay_sent == 0)) {
1376 length += KP_SET_MAX_CMSG_DELAY_CMDLEN;
1377 *extralength += KP_SET_MAX_CMSG_DELAY_CMDLEN;
1380 if (ping == TIMETOSENDPING_FORCE ||
1381 (length > 0 && ping != TIMETOSENDPING_NO)) {
1382 length += KP_PING_CMDLEN;
1383 *extralength += KP_PING_CMDLEN;
1385 if (unlikely(initsession)) {
1386 length += KP_INIT_SESSION_CMDLEN;
1387 *extralength += KP_INIT_SESSION_CMDLEN;
1391 return length;
1394 static int dequeue_messages(struct neighbor *nb_cmsglocked, int ping,
1395 int initsession, int nbstate, __u32 targetmss,
1396 __u32 *length, struct list_head *cmsgs)
1398 __u32 extralength = 0;
1399 __u32 totallength;
1401 int cmsgqueue_nonpong_empty = (
1402 list_empty(&(nb_cmsglocked->cmsg_queue_ack)) &&
1403 list_empty(&(nb_cmsglocked->cmsg_queue_ackconn)) &&
1404 list_empty(&(nb_cmsglocked->cmsg_queue_conndata_lowlat)) &&
1405 list_empty(&(nb_cmsglocked->cmsg_queue_conndata_highlat)) &&
1406 list_empty(&(nb_cmsglocked->cmsg_queue_other)));
1408 BUG_ON(list_empty(&(nb_cmsglocked->cmsg_queue_pong)) &&
1409 nb_cmsglocked->cmsg_pongslength != 0);
1410 BUG_ON(!list_empty(&(nb_cmsglocked->cmsg_queue_pong)) &&
1411 nb_cmsglocked->cmsg_pongslength == 0);
1412 BUG_ON(cmsgqueue_nonpong_empty &&
1413 nb_cmsglocked->cmsg_otherlength != 0);
1414 BUG_ON(!cmsgqueue_nonpong_empty &&
1415 nb_cmsglocked->cmsg_otherlength == 0);
1417 totallength = get_total_messages_length(nb_cmsglocked, ping,
1418 initsession, nbstate, &extralength);
1420 if (totallength == 0)
1421 return 1;
1423 if (totallength < targetmss && ping != TIMETOSENDPING_FORCE &&
1424 time_after(get_cmsg_timer_timeout(nb_cmsglocked,
1425 nbstate), jiffies))
1426 return 1;
1428 *length = extralength;
1430 _dequeue_messages(nb_cmsglocked, nbstate, targetmss, length, cmsgs);
1432 BUG_ON(*length == 0);
1433 BUG_ON(*length > targetmss);
1435 return 0;
1438 static void add_timeouted_retrans(struct neighbor *nb)
1440 spin_lock_bh(&(nb->retrans_lock));
1442 while (!list_empty(&(nb->retrans_list))) {
1443 struct control_retrans *cr = container_of(nb->retrans_list.next,
1444 struct control_retrans, timeout_list);
1446 BUG_ON(cr->nb != nb);
1448 if (time_after(cr->timeout, jiffies)) {
1449 if (mod_timer(&(nb->retrans_timer), cr->timeout) == 0) {
1450 kref_get(&(nb->ref));
1452 break;
1455 list_del(&(cr->timeout_list));
1456 rb_erase(&(cr->rbn), &(nb->kp_retransmits_rb));
1458 kref_put(&(cr->ref), kreffree_bug); /* rb */
1460 requeue_control_retrans(cr);
1463 spin_unlock_bh(&(nb->retrans_lock));
1466 static void _delete_all_cmsgs(struct list_head *cmsgs)
1468 while (!list_empty(cmsgs)) {
1469 struct control_msg_out *cm = container_of(cmsgs->next,
1470 struct control_msg_out, lh);
1472 list_del(&(cm->lh));
1474 if (cm->type == MSGTYPE_CONNDATA) {
1475 schedule_retransmit_conn(cm->msg.conn_data.cr, 0, 0);
1476 kfree(cm->msg.conn_data.data_orig);
1479 free_control_msg(cm);
1483 static void delete_all_cmsgs(struct neighbor *nb)
1485 while (1) {
1486 struct list_head cmsgs;
1487 __u32 length = 0;
1489 INIT_LIST_HEAD(&cmsgs);
1491 spin_lock_bh(&(nb->cmsg_lock));
1492 _dequeue_messages(nb, NEIGHBOR_STATE_ACTIVE, 65536, &length,
1493 &cmsgs);
1494 spin_unlock_bh(&(nb->cmsg_lock));
1496 if (list_empty(&cmsgs))
1497 break;
1499 _delete_all_cmsgs(&cmsgs);
1503 static int reset_timeouted_conn_needed(struct neighbor *nb,
1504 struct conn *src_in_l)
1506 if (unlikely(src_in_l->sourcetype != SOURCE_IN ||
1507 src_in_l->source.in.nb != nb ||
1508 src_in_l->isreset != 0))
1509 return 0;
1510 else if (likely(time_after(src_in_l->source.in.jiffies_last_act +
1511 CONN_ACTIVITY_UPDATEINTERVAL_SEC * HZ +
1512 CONN_INACTIVITY_TIMEOUT_SEC * HZ, jiffies)))
1513 return 0;
1515 return 1;
1518 static int reset_timeouted_conn(struct neighbor *nb, struct conn *src_in)
1520 int resetted = 0;
1522 if (src_in->is_client) {
1523 spin_lock_bh(&(src_in->rcv_lock));
1524 spin_lock_bh(&(src_in->reversedir->rcv_lock));
1525 } else {
1526 spin_lock_bh(&(src_in->reversedir->rcv_lock));
1527 spin_lock_bh(&(src_in->rcv_lock));
1530 resetted = reset_timeouted_conn_needed(nb, src_in);
1531 if (unlikely(resetted == 0))
1532 goto unlock;
1534 resetted = (send_reset_conn(nb, src_in->reversedir->target.out.conn_id,
1535 1) == 0);
1536 if (unlikely(resetted == 0))
1537 goto unlock;
1540 BUG_ON(src_in->reversedir->isreset != 0);
1541 src_in->reversedir->isreset = 1;
1543 unlock:
1544 if (src_in->is_client) {
1545 spin_unlock_bh(&(src_in->rcv_lock));
1546 spin_unlock_bh(&(src_in->reversedir->rcv_lock));
1547 } else {
1548 spin_unlock_bh(&(src_in->reversedir->rcv_lock));
1549 spin_unlock_bh(&(src_in->rcv_lock));
1552 if (resetted)
1553 reset_conn(src_in);
1555 return resetted;
1558 static void reset_timeouted_conns(struct neighbor *nb)
1560 int i;
1561 for (i=0;i<10000;i++) {
1562 unsigned long iflags;
1563 struct list_head *lh;
1564 struct conn *src_in;
1566 int resetted = 1;
1568 spin_lock_irqsave(&(nb->conn_list_lock), iflags);
1570 if (list_empty(&(nb->rcv_conn_list))) {
1571 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
1572 break;
1575 lh = nb->rcv_conn_list.next;
1576 list_del(lh);
1577 list_add_tail(lh, &(nb->rcv_conn_list));
1579 src_in = container_of(lh, struct conn, source.in.nb_list);
1580 kref_get(&(src_in->ref));
1582 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
1585 spin_lock_bh(&(src_in->rcv_lock));
1586 BUG_ON(src_in->sourcetype != SOURCE_IN);
1587 BUG_ON(src_in->source.in.nb != nb);
1588 resetted = reset_timeouted_conn_needed(nb, src_in);
1589 spin_unlock_bh(&(src_in->rcv_lock));
1590 if (likely(resetted == 0))
1591 goto put;
1593 resetted = reset_timeouted_conn(nb, src_in);
1595 put:
1596 kref_put(&(src_in->ref), free_conn);
1598 if (likely(resetted == 0))
1599 break;
1604 * may not be called by more than one thread at the same time, because
1605 * 1) readding control_msg_out may reorder them
1606 * 2) multiple pings may be sent
1608 int send_messages(struct neighbor *nb, ktime_t cmsg_send_start, int *sent)
1610 int rc = QOS_RESUME_DONE;
1611 int ping;
1612 int initsession;
1613 __u32 targetmss = mss_cmsg(nb);
1615 int nbstate = get_neigh_state(nb);
1617 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE))
1618 reset_timeouted_conns(nb);
1620 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
1621 spin_lock_bh(&(nb->retrans_lock));
1622 empty_retrans_queue(nb);
1623 spin_unlock_bh(&(nb->retrans_lock));
1625 delete_all_cmsgs(nb);
1626 return QOS_RESUME_DONE;
1629 ping = time_to_send_ping(nb);
1631 spin_lock_bh(&(nb->cmsg_lock));
1633 if (nb->add_retrans_needed != 0) {
1634 nb->add_retrans_needed = 0;
1635 spin_unlock_bh(&(nb->cmsg_lock));
1636 add_timeouted_retrans(nb);
1637 spin_lock_bh(&(nb->cmsg_lock));
1640 initsession = unlikely(atomic_read(&(nb->sessionid_snd_needed)) != 0);
1642 while (1) {
1643 struct list_head cmsgs;
1644 __u32 length = 0;
1645 __u64 seqno;
1647 INIT_LIST_HEAD(&cmsgs);
1649 if (dequeue_messages(nb, ping, initsession, nbstate,
1650 targetmss, &length, &cmsgs) != 0) {
1651 schedule_controlmsg_timer(nb);
1652 spin_unlock_bh(&(nb->cmsg_lock));
1653 return QOS_RESUME_DONE;
1656 nb->kpacket_seqno++;
1657 seqno = nb->kpacket_seqno;
1659 spin_unlock_bh(&(nb->cmsg_lock));
1661 rc = _send_messages_send(nb, ping, initsession, &cmsgs, nbstate,
1662 length, seqno, cmsg_send_start, sent);
1664 if (rc != QOS_RESUME_DONE)
1665 return rc;
1667 ping = 0;
1668 initsession = 0;
1670 spin_lock_bh(&(nb->cmsg_lock));
1674 void controlmsg_timerfunc(struct timer_list *cmsg_timer)
1676 struct neighbor *nb = container_of(cmsg_timer,
1677 struct neighbor, cmsg_timer);
1678 qos_enqueue(nb->queue, &(nb->rb_kp), ktime_get(), QOS_CALLER_KPACKET);
1679 kref_put(&(nb->ref), neighbor_free);
1682 static int cmsg_full_packet(struct neighbor *nb, int nbstate)
1684 __u32 extralength = 0;
1685 int ping = time_to_send_ping(nb);
1686 int initsession = unlikely(atomic_read(&(nb->sessionid_snd_needed)) !=
1688 __u32 len = get_total_messages_length(nb, ping, initsession, nbstate,
1689 &extralength);
1691 if (len == 0)
1692 return 0;
1693 if (len < mss_cmsg(nb))
1694 return 0;
1696 return 1;
1699 void schedule_controlmsg_timer(struct neighbor *nb_cmsglocked)
1701 unsigned long timeout;
1702 int nbstate = get_neigh_state(nb_cmsglocked);
1704 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED))
1705 goto now;
1707 if (atomic_read(&(nb_cmsglocked->cmsg_bulk_readds)) != 0)
1708 return;
1710 if (cmsg_full_packet(nb_cmsglocked, nbstate))
1711 goto now;
1713 if (nb_cmsglocked->add_retrans_needed != 0)
1714 goto now;
1716 timeout = get_cmsg_timer_timeout(nb_cmsglocked, nbstate);
1718 if (time_before_eq(timeout, jiffies)) {
1719 now:
1720 qos_enqueue(nb_cmsglocked->queue, &(nb_cmsglocked->rb_kp),
1721 ktime_get(), QOS_CALLER_KPACKET);
1722 } else {
1723 if (mod_timer(&(nb_cmsglocked->cmsg_timer), timeout) == 0) {
1724 kref_get(&(nb_cmsglocked->ref));
1729 static int insert_pending_conn_resets(struct control_msg_out *ins)
1731 struct neighbor *nb = ins->nb;
1732 __u32 conn_id = ins->msg.reset_conn.conn_id;
1734 struct rb_root *root;
1735 struct rb_node **p;
1736 struct rb_node *parent = 0;
1738 BUG_ON(nb == 0);
1739 BUG_ON(ins->msg.reset_conn.in_pending_conn_resets != 0);
1741 root = &(nb->pending_conn_resets_rb);
1742 p = &(root->rb_node);
1744 while ((*p) != 0) {
1745 struct control_msg_out *cm = container_of(*p,
1746 struct control_msg_out,
1747 msg.reset_conn.rbn);
1748 __u32 cm_connid = cm->msg.reset_conn.conn_id;
1750 BUG_ON(cm->nb != ins->nb);
1751 BUG_ON(cm->type != MSGTYPE_RESET_CONN);
1753 parent = *p;
1754 if (conn_id == cm_connid) {
1755 return 1;
1756 } else if (conn_id < cm_connid) {
1757 p = &(*p)->rb_left;
1758 } else if (conn_id > cm_connid) {
1759 p = &(*p)->rb_right;
1760 } else {
1761 BUG();
1765 kref_get(&(ins->ref));
1766 rb_link_node(&(ins->msg.reset_conn.rbn), parent, p);
1767 rb_insert_color(&(ins->msg.reset_conn.rbn), root);
1768 ins->msg.reset_conn.in_pending_conn_resets = 1;
1770 return 0;
1773 static void free_oldest_pong(struct neighbor *nb)
1775 struct control_msg_out *cm = container_of(nb->cmsg_queue_pong.next,
1776 struct control_msg_out, lh);
1778 BUG_ON(list_empty(&(nb->cmsg_queue_pong)));
1779 BUG_ON(unlikely(cm->type != MSGTYPE_PONG));
1781 list_del(&(cm->lh));
1782 nb->cmsg_pongslength -= cm->length;
1783 BUG_ON(nb->cmsg_pongscnt == 0);
1784 cm->nb->cmsg_pongscnt--;
1785 free_control_msg(cm);
1788 static int _enqueue_control_msg(struct control_msg_out *cm, int src)
1790 if (unlikely(cm->type == MSGTYPE_PONG)) {
1791 BUG_ON(src == ADDCMSG_SRC_SPLITCONNDATA);
1793 if (cm->nb->cmsg_pongscnt >= MAX_PONG_CMSGS_PER_NEIGH) {
1794 if (src != ADDCMSG_SRC_NEW) {
1795 BUG_ON(cm->nb->cmsg_pongscnt == 0);
1796 cm->nb->cmsg_pongscnt--;
1797 free_control_msg(cm);
1798 return 1;
1799 } else {
1800 free_oldest_pong(cm->nb);
1804 cm->nb->cmsg_pongscnt++;
1805 cm->nb->cmsg_pongslength += cm->length;
1807 if (src != ADDCMSG_SRC_NEW) {
1808 list_add(&(cm->lh), &(cm->nb->cmsg_queue_pong));
1809 } else {
1810 list_add_tail(&(cm->lh), &(cm->nb->cmsg_queue_pong));
1813 return 0;
1814 } else if (unlikely(cm->type == MSGTYPE_RESET_CONN)) {
1815 if (insert_pending_conn_resets(cm) != 0) {
1816 cm->type = 0;
1817 free_control_msg(cm);
1818 return 1;
1822 cm->nb->cmsg_otherlength += cm->length;
1823 if (src == ADDCMSG_SRC_NEW) {
1824 if (cm->type == MSGTYPE_ACK) {
1825 list_add_tail(&(cm->lh), &(cm->nb->cmsg_queue_ack));
1826 } else if (cm->type == MSGTYPE_ACK_CONN) {
1827 list_add_tail(&(cm->lh), &(cm->nb->cmsg_queue_ackconn));
1828 } else if (cm->type == MSGTYPE_CONNDATA &&
1829 cm->msg.conn_data.highlatency != 0) {
1830 list_add_tail(&(cm->lh),
1831 &(cm->nb->cmsg_queue_conndata_highlat));
1832 } else if (cm->type == MSGTYPE_CONNDATA) {
1833 list_add_tail(&(cm->lh),
1834 &(cm->nb->cmsg_queue_conndata_lowlat));
1835 } else {
1836 list_add_tail(&(cm->lh), &(cm->nb->cmsg_queue_other));
1838 } else {
1839 BUG_ON(src == ADDCMSG_SRC_SPLITCONNDATA &&
1840 cm->type != MSGTYPE_CONNDATA);
1841 BUG_ON(src == ADDCMSG_SRC_READD &&
1842 cm->type == MSGTYPE_ACK_CONN);
1844 if (cm->type == MSGTYPE_ACK) {
1845 list_add(&(cm->lh), &(cm->nb->cmsg_queue_ack));
1846 } else if (cm->type == MSGTYPE_ACK_CONN) {
1847 list_add(&(cm->lh), &(cm->nb->cmsg_queue_ackconn));
1848 } else if (cm->type == MSGTYPE_CONNDATA &&
1849 cm->msg.conn_data.highlatency != 0) {
1850 list_add(&(cm->lh),
1851 &(cm->nb->cmsg_queue_conndata_highlat));
1852 } else if (cm->type == MSGTYPE_CONNDATA) {
1853 list_add(&(cm->lh),
1854 &(cm->nb->cmsg_queue_conndata_lowlat));
1855 } else {
1856 list_add(&(cm->lh), &(cm->nb->cmsg_queue_other));
1860 return 0;
1863 static void enqueue_control_msg(struct control_msg_out *cm, int src)
1865 BUG_ON(cm == 0);
1866 BUG_ON(cm->nb == 0);
1868 if (src == ADDCMSG_SRC_NEW)
1869 cm->time_added = jiffies;
1871 spin_lock_bh(&(cm->nb->cmsg_lock));
1873 if (_enqueue_control_msg(cm, src) != 0)
1874 goto out;
1876 if (src != ADDCMSG_SRC_READD && src != ADDCMSG_SRC_RETRANS)
1877 schedule_controlmsg_timer(cm->nb);
1879 out:
1880 spin_unlock_bh(&(cm->nb->cmsg_lock));
1884 void send_pong(struct neighbor *nb, __u32 cookie)
1886 struct control_msg_out *cm = _alloc_control_msg(nb);
1888 if (unlikely(cm == 0))
1889 return;
1891 cm->nb = nb;
1892 cm->type = MSGTYPE_PONG;
1893 cm->msg.pong.cookie = cookie;
1894 cm->msg.pong.type = MSGTYPE_PONG_TIMEENQUEUED;
1895 cm->msg.pong.time_enqueued = ktime_get();
1896 cm->length = 9;
1897 enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
1900 void send_ack(struct neighbor *nb, __u64 seqno)
1902 struct control_msg_out *cm = alloc_control_msg(nb, ACM_PRIORITY_HIGH);
1904 if (unlikely(cm == 0))
1905 return;
1907 cm->nb = nb;
1908 cm->type = MSGTYPE_ACK;
1909 cm->msg.ack.seqno = seqno;
1910 cm->length = 7;
1911 enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
1914 static void set_ooolen_flags(struct control_msg_out *cm)
1916 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
1917 (~KP_ACK_CONN_FLAGS_OOO));
1918 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
1919 ooolen_to_flags(cm->msg.ack_conn.length));
1922 /* cmsg_lock must be held */
1923 static void remove_pending_ackconn(struct control_msg_out *cm)
1925 cm->nb->cmsg_otherlength -= cm->length;
1926 list_del(&(cm->lh));
1928 list_del(&(cm->msg.ack_conn.conn_acks));
1929 kref_put(&(cm->msg.ack_conn.src_in->ref), free_conn);
1930 cm->msg.ack_conn.src_in = 0;
1932 cm->type = 0;
1933 free_control_msg(cm);
1936 /* cmsg_lock must be held */
1937 static void recalc_scheduled_ackconn_size(struct control_msg_out *cm)
1939 cm->nb->cmsg_otherlength -= cm->length;
1940 cm->length = 6 + ack_conn_len(cm->msg.ack_conn.flags);
1941 cm->nb->cmsg_otherlength += cm->length;
1944 /* cmsg_lock must be held */
1945 static int _try_merge_ackconn(struct conn *src_in_l,
1946 struct control_msg_out *fromcm, struct control_msg_out *tocm,
1947 int from_newack)
1949 if (ooolen(fromcm->msg.ack_conn.flags) != 0 &&
1950 ooolen(tocm->msg.ack_conn.flags) != 0) {
1951 __u64 tocmseqno = tocm->msg.ack_conn.seqno_ooo;
1952 __u64 tocmlength = tocm->msg.ack_conn.length;
1953 __u64 fromcmseqno = fromcm->msg.ack_conn.seqno_ooo;
1954 __u64 fromcmlength = fromcm->msg.ack_conn.length;
1956 if (seqno_eq(tocmseqno, fromcmseqno)) {
1957 if (fromcmlength > tocmlength)
1958 tocm->msg.ack_conn.length = fromcmlength;
1959 } else if (seqno_after(fromcmseqno, tocmseqno) &&
1960 seqno_before_eq(fromcmseqno, tocmseqno +
1961 tocmlength)) {
1962 __u64 len = seqno_clean(fromcmseqno + fromcmlength -
1963 tocmseqno);
1964 BUG_ON(len > U32_MAX);
1965 tocm->msg.ack_conn.length = (__u32) len;
1966 } else if (seqno_before(fromcmseqno, tocmseqno) &&
1967 seqno_after_eq(fromcmseqno, tocmseqno)) {
1968 __u64 len = seqno_clean(tocmseqno + tocmlength -
1969 fromcmseqno);
1970 BUG_ON(len > U32_MAX);
1971 tocm->msg.ack_conn.seqno_ooo = fromcmseqno;
1972 tocm->msg.ack_conn.length = (__u32) len;
1973 } else {
1974 return 1;
1976 set_ooolen_flags(tocm);
1979 if ((fromcm->msg.ack_conn.flags &
1980 KP_ACK_CONN_FLAGS_SEQNO) != 0) {
1981 if ((tocm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) == 0)
1982 goto setseqno;
1984 BUG_ON(seqno_eq(fromcm->msg.ack_conn.ack_seqno,
1985 tocm->msg.ack_conn.ack_seqno));
1986 if (seqno_after_eq(tocm->msg.ack_conn.ack_seqno,
1987 fromcm->msg.ack_conn.ack_seqno)) {
1988 BUG_ON(seqno_after(fromcm->msg.ack_conn.seqno,
1989 tocm->msg.ack_conn.seqno));
1990 goto skipseqno;
1993 BUG_ON(seqno_before(fromcm->msg.ack_conn.seqno,
1994 tocm->msg.ack_conn.seqno));
1996 setseqno:
1997 tocm->msg.ack_conn.flags = (tocm->msg.ack_conn.flags |
1998 KP_ACK_CONN_FLAGS_SEQNO);
1999 tocm->msg.ack_conn.seqno = fromcm->msg.ack_conn.seqno;
2000 tocm->msg.ack_conn.ack_seqno = fromcm->msg.ack_conn.ack_seqno;
2002 skipseqno:
2003 if ((fromcm->msg.ack_conn.flags &
2004 KP_ACK_CONN_FLAGS_WINDOW) != 0)
2005 tocm->msg.ack_conn.flags = (tocm->msg.ack_conn.flags |
2006 KP_ACK_CONN_FLAGS_WINDOW);
2010 if (ooolen(fromcm->msg.ack_conn.flags) != 0) {
2011 tocm->msg.ack_conn.seqno_ooo = fromcm->msg.ack_conn.seqno_ooo;
2012 tocm->msg.ack_conn.length = fromcm->msg.ack_conn.length;
2013 set_ooolen_flags(tocm);
2016 if ((fromcm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) != 0) {
2017 BUG_ON((tocm->msg.ack_conn.flags &
2018 KP_ACK_CONN_FLAGS_PRIORITY) != 0);
2019 tocm->msg.ack_conn.priority_seqno =
2020 fromcm->msg.ack_conn.priority_seqno;
2021 tocm->msg.ack_conn.priority = fromcm->msg.ack_conn.priority;
2024 recalc_scheduled_ackconn_size(tocm);
2025 if (from_newack == 0)
2026 remove_pending_ackconn(fromcm);
2028 return 0;
2031 /* cmsg_lock must be held */
2032 static void try_merge_ackconns(struct conn *src_in_l,
2033 struct control_msg_out *cm)
2035 struct list_head *currlh = cm->msg.ack_conn.conn_acks.next;
2037 while (currlh != &(src_in_l->source.in.acks_pending)) {
2038 struct control_msg_out *currcm = container_of(currlh,
2039 struct control_msg_out,
2040 msg.ack_conn.conn_acks);
2041 currlh = currlh->next;
2042 remove_connack_oooflag_ifold(src_in_l, currcm);
2043 _try_merge_ackconn(src_in_l, currcm, cm, 0);
2047 static void merge_or_enqueue_ackconn(struct conn *src_in_l,
2048 struct control_msg_out *cm, int src)
2050 struct list_head *currlh;
2052 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
2054 spin_lock_bh(&(cm->nb->cmsg_lock));
2056 currlh = src_in_l->source.in.acks_pending.next;
2058 while (currlh != &(src_in_l->source.in.acks_pending)) {
2059 struct control_msg_out *currcm = container_of(currlh,
2060 struct control_msg_out,
2061 msg.ack_conn.conn_acks);
2063 BUG_ON(currcm->nb != cm->nb);
2064 BUG_ON(currcm->type != MSGTYPE_ACK_CONN);
2065 BUG_ON(cm->msg.ack_conn.src_in != src_in_l);
2066 BUG_ON(currcm->msg.ack_conn.conn_id !=
2067 cm->msg.ack_conn.conn_id);
2069 if (_try_merge_ackconn(src_in_l, cm, currcm, 1) == 0) {
2070 try_merge_ackconns(src_in_l, currcm);
2071 schedule_controlmsg_timer(currcm->nb);
2072 spin_unlock_bh(&(currcm->nb->cmsg_lock));
2074 * flags:
2075 * when calling free_control_msg here conn may already
2076 * be locked and priority_send_allowed and
2077 * priority_send_allowed should not be reset
2079 cm->msg.ack_conn.flags = 0;
2080 free_control_msg(cm);
2081 return;
2084 currlh = currlh->next;
2087 list_add_tail(&(cm->msg.ack_conn.conn_acks),
2088 &(src_in_l->source.in.acks_pending));
2090 spin_unlock_bh(&(cm->nb->cmsg_lock));
2092 enqueue_control_msg(cm, src);
2095 static int try_update_ackconn_seqno(struct conn *src_in_l)
2097 int rc = 1;
2099 spin_lock_bh(&(src_in_l->source.in.nb->cmsg_lock));
2101 if (list_empty(&(src_in_l->source.in.acks_pending)) == 0) {
2102 struct control_msg_out *cm = container_of(
2103 src_in_l->source.in.acks_pending.next,
2104 struct control_msg_out,
2105 msg.ack_conn.conn_acks);
2106 BUG_ON(cm->nb != src_in_l->source.in.nb);
2107 BUG_ON(cm->type != MSGTYPE_ACK_CONN);
2108 BUG_ON(cm->msg.ack_conn.src_in != src_in_l);
2109 BUG_ON(cm->msg.ack_conn.conn_id !=
2110 src_in_l->reversedir->target.out.conn_id);
2112 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
2113 KP_ACK_CONN_FLAGS_SEQNO |
2114 KP_ACK_CONN_FLAGS_WINDOW);
2115 cm->msg.ack_conn.seqno = src_in_l->source.in.next_seqno;
2117 src_in_l->source.in.ack_seqno++;
2118 cm->msg.ack_conn.ack_seqno = src_in_l->source.in.ack_seqno;
2120 remove_connack_oooflag_ifold(src_in_l, cm);
2121 recalc_scheduled_ackconn_size(cm);
2123 try_merge_ackconns(src_in_l, cm);
2125 rc = 0;
2128 spin_unlock_bh(&(src_in_l->source.in.nb->cmsg_lock));
2130 return rc;
2133 void send_ack_conn_ifneeded(struct conn *src_in_l, __u64 seqno_ooo,
2134 __u32 ooo_length)
2136 struct control_msg_out *cm;
2138 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
2140 BUG_ON(ooo_length > 0 && seqno_before_eq(seqno_ooo,
2141 src_in_l->source.in.next_seqno));
2143 update_windowlimit(src_in_l);
2145 if (ooo_length != 0) {
2146 cm = alloc_control_msg(src_in_l->source.in.nb,
2147 ACM_PRIORITY_LOW);
2148 if (cm != 0)
2149 goto add;
2152 if (src_in_l->source.in.inorder_ack_needed != 0)
2153 goto ack_needed;
2155 if (seqno_clean(src_in_l->source.in.window_seqnolimit -
2156 src_in_l->source.in.next_seqno) < WINDOW_ENCODE_MIN)
2157 return;
2159 if (seqno_clean(src_in_l->source.in.window_seqnolimit_remote -
2160 src_in_l->source.in.next_seqno) >= WINDOW_ENCODE_MIN &&
2161 seqno_clean(src_in_l->source.in.window_seqnolimit -
2162 src_in_l->source.in.next_seqno) * 7 <
2163 seqno_clean(
2164 src_in_l->source.in.window_seqnolimit_remote -
2165 src_in_l->source.in.next_seqno) * 8)
2166 return;
2168 ack_needed:
2169 if (try_update_ackconn_seqno(src_in_l) == 0)
2170 goto out;
2172 cm = alloc_control_msg(src_in_l->source.in.nb, ACM_PRIORITY_MED);
2173 if (cm == 0) {
2174 printk(KERN_ERR "error allocating inorder ack");
2175 return;
2178 add:
2179 cm->type = MSGTYPE_ACK_CONN;
2180 src_in_l->source.in.ack_seqno++;
2181 cm->msg.ack_conn.ack_seqno = src_in_l->source.in.ack_seqno;
2182 kref_get(&(src_in_l->ref));
2183 cm->msg.ack_conn.src_in = src_in_l;
2184 cm->msg.ack_conn.conn_id = src_in_l->reversedir->target.out.conn_id;
2185 cm->msg.ack_conn.seqno = src_in_l->source.in.next_seqno;
2186 cm->msg.ack_conn.seqno_ooo = seqno_ooo;
2187 cm->msg.ack_conn.length = ooo_length;
2188 cm->msg.ack_conn.flags = KP_ACK_CONN_FLAGS_SEQNO |
2189 KP_ACK_CONN_FLAGS_WINDOW;
2190 set_ooolen_flags(cm);
2191 cm->length = 6 + ack_conn_len(cm->msg.ack_conn.flags);
2193 merge_or_enqueue_ackconn(src_in_l, cm, ADDCMSG_SRC_NEW);
2195 out:
2196 src_in_l->source.in.inorder_ack_needed = 0;
2197 src_in_l->source.in.window_seqnolimit_remote =
2198 src_in_l->source.in.window_seqnolimit;
2201 static int try_add_priority(struct conn *trgt_out_l, __u8 priority)
2203 int rc = 1;
2204 struct conn *src_in = trgt_out_l->reversedir;
2206 spin_lock_bh(&(trgt_out_l->target.out.nb->cmsg_lock));
2208 if (list_empty(&(src_in->source.in.acks_pending)) == 0) {
2209 struct control_msg_out *cm = container_of(
2210 src_in->source.in.acks_pending.next,
2211 struct control_msg_out,
2212 msg.ack_conn.conn_acks);
2213 BUG_ON(cm->nb != trgt_out_l->target.out.nb);
2214 BUG_ON(cm->type != MSGTYPE_ACK_CONN);
2215 BUG_ON(cm->msg.ack_conn.src_in != trgt_out_l->reversedir);
2216 BUG_ON(cm->msg.ack_conn.conn_id !=
2217 trgt_out_l->target.out.conn_id);
2219 BUG_ON((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) !=
2221 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
2222 KP_ACK_CONN_FLAGS_PRIORITY);
2223 cm->msg.ack_conn.priority_seqno =
2224 trgt_out_l->target.out.priority_seqno;
2225 cm->msg.ack_conn.priority = priority;
2226 recalc_scheduled_ackconn_size(cm);
2228 rc = 0;
2231 spin_unlock_bh(&(trgt_out_l->target.out.nb->cmsg_lock));
2233 return rc;
2236 void send_priority(struct conn *trgt_out_ll, int force, __u8 priority)
2238 struct control_msg_out *cm;
2240 if (try_add_priority(trgt_out_ll, priority) == 0)
2241 goto out;
2243 if (force == 0)
2244 return;
2246 cm = alloc_control_msg(trgt_out_ll->target.out.nb, ACM_PRIORITY_LOW);
2248 if (cm == 0)
2249 return;
2251 cm->type = MSGTYPE_ACK_CONN;
2252 cm->msg.ack_conn.flags = KP_ACK_CONN_FLAGS_PRIORITY;
2253 kref_get(&(trgt_out_ll->reversedir->ref));
2254 BUG_ON(trgt_out_ll->targettype != TARGET_OUT);
2255 cm->msg.ack_conn.src_in = trgt_out_ll->reversedir;
2256 cm->msg.ack_conn.conn_id = trgt_out_ll->target.out.conn_id;
2257 cm->msg.ack_conn.priority_seqno =
2258 trgt_out_ll->target.out.priority_seqno;
2259 cm->msg.ack_conn.priority = priority;
2261 cm->length = 6 + ack_conn_len(cm->msg.ack_conn.flags);
2262 merge_or_enqueue_ackconn(trgt_out_ll->reversedir, cm, ADDCMSG_SRC_NEW);
2264 out:
2265 trgt_out_ll->target.out.priority_last = priority;
2266 trgt_out_ll->target.out.priority_seqno++;
2267 trgt_out_ll->target.out.priority_send_allowed = 0;
2270 void free_ack_conns(struct conn *src_in_lx)
2272 int changed = 0;
2273 spin_lock_bh(&(src_in_lx->source.in.nb->cmsg_lock));
2274 while (list_empty(&(src_in_lx->source.in.acks_pending)) == 0) {
2275 struct list_head *currlh =
2276 src_in_lx->source.in.acks_pending.next;
2277 struct control_msg_out *currcm = container_of(currlh,
2278 struct control_msg_out,
2279 msg.ack_conn.conn_acks);
2281 remove_pending_ackconn(currcm);
2282 changed = 1;
2284 if (changed)
2285 schedule_controlmsg_timer(src_in_lx->source.in.nb);
2286 spin_unlock_bh(&(src_in_lx->source.in.nb->cmsg_lock));
2289 void send_connect_success(struct control_msg_out *cm, __u32 conn_id,
2290 struct conn *src_in)
2292 cm->type = MSGTYPE_CONNECT_SUCCESS;
2293 cm->msg.connect_success.conn_id = conn_id;
2294 kref_get(&(src_in->ref));
2295 cm->msg.connect_success.src_in = src_in;
2296 cm->length = 6;
2297 enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2300 void send_connect_nb(struct control_msg_out *cm, __u32 conn_id, __u64 seqno1,
2301 __u64 seqno2, struct conn *src_in_ll)
2303 cm->type = MSGTYPE_CONNECT;
2304 cm->msg.connect.conn_id = conn_id;
2305 cm->msg.connect.seqno1 = seqno1;
2306 cm->msg.connect.seqno2 = seqno2;
2307 kref_get(&(src_in_ll->ref));
2308 BUG_ON(src_in_ll->sourcetype != SOURCE_IN);
2309 cm->msg.connect.src_in = src_in_ll;
2310 cm->length = 20;
2311 enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2314 void send_conndata(struct control_msg_out *cm, __u32 conn_id, __u64 seqno,
2315 char *data_orig, char *data, __u32 datalen,
2316 __u8 snd_delayed_lowbuf, __u8 flush, __u8 highlatency,
2317 struct conn_retrans *cr)
2319 cm->type = MSGTYPE_CONNDATA;
2320 cm->msg.conn_data.conn_id = conn_id;
2321 cm->msg.conn_data.seqno = seqno;
2322 cm->msg.conn_data.data_orig = data_orig;
2323 cm->msg.conn_data.data = data;
2324 cm->msg.conn_data.datalen = datalen;
2325 cm->msg.conn_data.snd_delayed_lowbuf = snd_delayed_lowbuf;
2326 cm->msg.conn_data.flush = flush;
2327 cm->msg.conn_data.highlatency = highlatency;
2328 cm->msg.conn_data.cr = cr;
2329 cm->length = KP_CONN_DATA_CMDLEN + datalen;
2330 enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2333 int send_reset_conn(struct neighbor *nb, __u32 conn_id, int lowprio)
2335 struct control_msg_out *cm;
2337 if (unlikely(get_neigh_state(nb) == NEIGHBOR_STATE_KILLED))
2338 return 0;
2340 cm = alloc_control_msg(nb, lowprio ?
2341 ACM_PRIORITY_LOW : ACM_PRIORITY_MED);
2343 if (unlikely(cm == 0))
2344 return 1;
2346 cm->type = MSGTYPE_RESET_CONN;
2347 cm->msg.reset_conn.conn_id = conn_id;
2348 cm->length = 5;
2350 enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2352 return 0;
2355 int __init cor_kgen_init(void)
2357 controlmsg_slab = kmem_cache_create("cor_controlmsg",
2358 sizeof(struct control_msg_out), 8, 0, 0);
2359 if (unlikely(controlmsg_slab == 0))
2360 return -ENOMEM;
2362 controlretrans_slab = kmem_cache_create("cor_controlretransmsg",
2363 sizeof(struct control_retrans), 8, 0, 0);
2364 if (unlikely(controlretrans_slab == 0))
2365 return -ENOMEM;
2367 return 0;
2370 MODULE_LICENSE("GPL");