convert rcv_conn_list to snd_conn_list
[cor.git] / net / cor / neigh_snd.c
blobf39bff25fa8af4b96dd642f34c99951293c15ea1
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <asm/byteorder.h>
23 #include "cor.h"
25 /* not sent over the network - internal meaning only */
26 #define MSGTYPE_PONG 1
27 #define MSGTYPE_ACK 2
28 #define MSGTYPE_ACK_CONN 3
29 #define MSGTYPE_CONNECT 4
30 #define MSGTYPE_CONNECT_SUCCESS 5
31 #define MSGTYPE_RESET_CONN 6
32 #define MSGTYPE_CONNDATA 7
33 #define MSGTYPE_SET_MAX_CMSG_DELAY 8
35 #define MSGTYPE_PONG_TIMEENQUEUED 1
36 #define MSGTYPE_PONG_RESPDELAY 2
38 struct cor_control_msg_out{
39 __u8 type;
40 __u32 length;
41 struct kref ref;
42 struct cor_neighbor *nb;
44 /* either queue or control_retrans_packet */
45 struct list_head lh;
47 unsigned long time_added;
49 union{
50 struct{
51 __u32 cookie;
52 __u8 type;
54 ktime_t ping_rcvtime;
55 ktime_t time_enqueued;
56 }pong;
58 struct{
59 __u64 seqno;
60 __u8 fast;
61 }ack;
63 struct{
64 struct cor_conn *src_in;
65 struct list_head conn_acks;
66 __u32 conn_id;
67 __u64 seqno;
68 __u64 seqno_ooo;
69 __u32 length;
71 __u8 flags;
73 __u8 bufsize_changerate;
75 __u16 priority;
76 __u8 priority_seqno;
78 __u8 is_highlatency;
79 __u8 queue;
81 __u32 ack_seqno;
82 }ack_conn;
84 struct{
85 __u32 conn_id;
86 __u64 seqno1;
87 __u64 seqno2;
88 struct cor_conn *src_in;
89 }connect;
91 struct{
92 __u32 conn_id;
93 struct cor_conn *src_in;
94 }connect_success;
96 struct{
97 struct rb_node rbn;
98 __u8 in_pending_conn_resets;
99 __u32 conn_id;
100 }reset_conn;
102 struct{
103 __u32 conn_id;
104 __u64 seqno;
105 __u32 datalen;
106 __u8 windowused;
107 __u8 flush;
108 __u8 highlatency;
109 char *data_orig;
110 char *data;
111 struct cor_conn_retrans *cr;
112 }conn_data;
114 struct{
115 __u32 ack_fast_delay;
116 __u32 ack_slow_delay;
117 __u32 ackconn_lowlatency_delay;
118 __u32 ackconn_highlatency_delay;
119 __u32 pong_delay;
120 }set_max_cmsg_delay;
121 }msg;
124 struct cor_control_retrans {
125 struct kref ref;
127 struct cor_neighbor *nb;
128 __u64 seqno;
130 unsigned long timeout;
132 struct list_head msgs;
134 struct rb_node rbn;
135 struct list_head timeout_list;
139 static struct kmem_cache *cor_controlmsg_slab;
140 static struct kmem_cache *cor_controlretrans_slab;
142 static atomic_t cor_cmsg_othercnt = ATOMIC_INIT(0);
144 #define ADDCMSG_SRC_NEW 1
145 #define ADDCMSG_SRC_SPLITCONNDATA 2
146 #define ADDCMSG_SRC_READD 3
147 #define ADDCMSG_SRC_RETRANS 4
149 static void cor_enqueue_control_msg(struct cor_control_msg_out *msg, int src);
151 static void cor_try_merge_ackconns(struct cor_conn *src_in_l,
152 struct cor_control_msg_out *cm);
154 static void cor_merge_or_enqueue_ackconn(struct cor_conn *src_in_l,
155 struct cor_control_msg_out *cm, int src);
157 static struct cor_control_msg_out *_cor_alloc_control_msg(
158 struct cor_neighbor *nb)
160 struct cor_control_msg_out *cm;
162 BUG_ON(nb == 0);
164 cm = kmem_cache_alloc(cor_controlmsg_slab, GFP_ATOMIC);
165 if (unlikely(cm == 0))
166 return 0;
167 memset(cm, 0, sizeof(struct cor_control_msg_out));
168 kref_init(&(cm->ref));
169 cm->nb = nb;
170 return cm;
173 static int cor_calc_limit(int limit, int priority)
175 if (priority == ACM_PRIORITY_LOW)
176 return (limit+1)/2;
177 else if (priority == ACM_PRIORITY_MED)
178 return (limit * 3 + 1)/4;
179 else if (priority == ACM_PRIORITY_HIGH)
180 return limit;
181 else
182 BUG();
185 struct cor_control_msg_out *cor_alloc_control_msg(struct cor_neighbor *nb,
186 int priority)
188 struct cor_control_msg_out *cm = 0;
190 long packets1;
191 long packets2;
193 BUG_ON(nb == 0);
195 packets1 = atomic_inc_return(&(nb->cmsg_othercnt));
196 packets2 = atomic_inc_return(&(cor_cmsg_othercnt));
198 BUG_ON(packets1 <= 0);
199 BUG_ON(packets2 <= 0);
201 if (packets1 <= cor_calc_limit(GUARANTEED_CMSGS_PER_NEIGH, priority))
202 goto alloc;
204 if (unlikely(unlikely(packets1 > cor_calc_limit(MAX_CMSGS_PER_NEIGH,
205 priority)) ||
206 unlikely(packets2 > cor_calc_limit(MAX_CMSGS,
207 priority))))
208 goto full;
210 alloc:
211 cm = _cor_alloc_control_msg(nb);
212 if (unlikely(cm == 0)) {
213 full:
215 /* printk(KERN_ERR "cor_alloc_control_msg failed %ld %ld\n",
216 packets1, packets2); */
217 atomic_dec(&(nb->cmsg_othercnt));
218 atomic_dec(&(cor_cmsg_othercnt));
220 return cm;
223 static void cor_cmsg_kref_free(struct kref *ref)
225 struct cor_control_msg_out *cm = container_of(ref,
226 struct cor_control_msg_out, ref);
227 kmem_cache_free(cor_controlmsg_slab, cm);
230 void cor_free_control_msg(struct cor_control_msg_out *cm)
232 if (likely(cm->type != MSGTYPE_PONG)) {
233 atomic_dec(&(cm->nb->cmsg_othercnt));
234 atomic_dec(&(cor_cmsg_othercnt));
237 if (cm->type == MSGTYPE_ACK_CONN) {
238 BUG_ON(cm->msg.ack_conn.src_in == 0);
239 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) != 0){
240 struct cor_conn *trgt_out = cor_get_conn_reversedir(
241 cm->msg.ack_conn.src_in);
242 spin_lock_bh(&(trgt_out->rcv_lock));
243 BUG_ON(trgt_out->targettype != TARGET_OUT);
244 if (trgt_out->target.out.priority_send_allowed != 0) {
245 trgt_out->target.out.priority_send_allowed = 1;
246 spin_unlock_bh(&(trgt_out->rcv_lock));
247 cor_conn_refresh_priority(trgt_out, 0);
248 } else {
249 spin_unlock_bh(&(trgt_out->rcv_lock));
252 cor_conn_kref_put(cm->msg.ack_conn.src_in,
253 "cor_control_msg_out ack_conn");
254 cm->msg.ack_conn.src_in = 0;
255 } else if (cm->type == MSGTYPE_CONNECT) {
256 BUG_ON(cm->msg.connect.src_in == 0);
257 cor_conn_kref_put(cm->msg.connect.src_in,
258 "cor_control_msg_out connect");
259 cm->msg.connect.src_in = 0;
260 } else if (cm->type == MSGTYPE_CONNECT_SUCCESS) {
261 BUG_ON(cm->msg.connect_success.src_in == 0);
262 cor_conn_kref_put(cm->msg.connect_success.src_in,
263 "cor_control_msg_out connect_success");
264 cm->msg.connect_success.src_in = 0;
265 } else if (cm->type == MSGTYPE_RESET_CONN) {
266 spin_lock_bh(&(cm->nb->cmsg_lock));
267 if (cm->msg.reset_conn.in_pending_conn_resets != 0) {
268 rb_erase(&(cm->msg.reset_conn.rbn),
269 &(cm->nb->pending_conn_resets_rb));
270 cm->msg.reset_conn.in_pending_conn_resets = 0;
272 kref_put(&(cm->ref), cor_kreffree_bug);
274 spin_unlock_bh(&(cm->nb->cmsg_lock));
277 kref_put(&(cm->ref), cor_cmsg_kref_free);
280 static void cor_free_control_retrans(struct kref *ref)
282 struct cor_control_retrans *cr = container_of(ref,
283 struct cor_control_retrans, ref);
285 while (list_empty(&(cr->msgs)) == 0) {
286 struct cor_control_msg_out *cm = container_of(cr->msgs.next,
287 struct cor_control_msg_out, lh);
289 if (cm->type == MSGTYPE_PONG)
290 atomic_dec(&(cm->nb->cmsg_pongs_retrans_cnt));
292 list_del(&(cm->lh));
293 cor_free_control_msg(cm);
296 kmem_cache_free(cor_controlretrans_slab, cr);
299 struct cor_control_retrans *cor_get_control_retrans(
300 struct cor_neighbor *nb_retranslocked, __u64 seqno)
302 struct rb_node *n = 0;
303 struct cor_control_retrans *ret = 0;
305 n = nb_retranslocked->kp_retransmits_rb.rb_node;
307 while (likely(n != 0) && ret == 0) {
308 struct cor_control_retrans *cr = container_of(n,
309 struct cor_control_retrans, rbn);
311 BUG_ON(cr->nb != nb_retranslocked);
313 if (cor_seqno_before(seqno, cr->seqno))
314 n = n->rb_left;
315 else if (cor_seqno_after(seqno, cr->seqno))
316 n = n->rb_right;
317 else
318 ret = cr;
321 if (ret != 0)
322 kref_get(&(ret->ref));
324 return ret;
327 /* nb->retrans_lock must be held */
328 void cor_insert_control_retrans(struct cor_control_retrans *ins)
330 struct cor_neighbor *nb = ins->nb;
331 __u64 seqno = ins->seqno;
333 struct rb_root *root;
334 struct rb_node **p;
335 struct rb_node *parent = 0;
337 BUG_ON(nb == 0);
339 root = &(nb->kp_retransmits_rb);
340 p = &(root->rb_node);
342 while ((*p) != 0) {
343 struct cor_control_retrans *cr = container_of(*p,
344 struct cor_control_retrans, rbn);
346 BUG_ON(cr->nb != nb);
348 parent = *p;
349 if (unlikely(cor_seqno_eq(seqno, cr->seqno))) {
350 BUG();
351 } else if (cor_seqno_before(seqno, cr->seqno)) {
352 p = &(*p)->rb_left;
353 } else if (cor_seqno_after(seqno, cr->seqno)) {
354 p = &(*p)->rb_right;
355 } else {
356 BUG();
360 kref_get(&(ins->ref));
361 rb_link_node(&(ins->rbn), parent, p);
362 rb_insert_color(&(ins->rbn), root);
365 static void cor_remove_connack_oooflag_ifold(struct cor_conn *src_in_l,
366 struct cor_control_msg_out *cm)
368 if (cor_ooolen(cm->msg.ack_conn.flags) != 0 && cor_seqno_before_eq(
369 cm->msg.ack_conn.seqno_ooo +
370 cm->msg.ack_conn.length,
371 src_in_l->source.in.next_seqno)) {
372 cm->msg.ack_conn.length = 0;
373 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
374 (~KP_ACK_CONN_FLAGS_OOO));
378 static int cor_ackconn_prepare_requeue(struct cor_conn *cn_l,
379 struct cor_control_msg_out *cm)
381 if (unlikely(unlikely(cn_l->sourcetype != SOURCE_IN) ||
382 unlikely(cn_l->source.in.nb != cm->nb) ||
383 unlikely(
384 cor_get_connid_reverse(cn_l->source.in.conn_id) !=
385 cm->msg.ack_conn.conn_id) ||
386 unlikely(cn_l->isreset != 0)))
387 return 0;
389 cor_remove_connack_oooflag_ifold(cn_l, cm);
391 if (!cor_seqno_eq(cm->msg.ack_conn.ack_seqno,
392 cn_l->source.in.ack_seqno))
393 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
394 (~KP_ACK_CONN_FLAGS_SEQNO) &
395 (~KP_ACK_CONN_FLAGS_WINDOW));
397 if (cm->msg.ack_conn.flags == 0)
398 return 0;
400 cm->length = 5 + cor_ack_conn_len(cm->msg.ack_conn.flags);
402 return 1;
405 static void cor_requeue_control_retrans(struct cor_control_retrans *cr)
407 atomic_inc(&(cr->nb->cmsg_bulk_readds));
409 while (list_empty(&(cr->msgs)) == 0) {
410 struct cor_control_msg_out *cm = container_of(cr->msgs.prev,
411 struct cor_control_msg_out, lh);
412 list_del(&(cm->lh));
414 BUG_ON(cm->nb != cr->nb);
416 if (cm->type == MSGTYPE_ACK_CONN) {
417 struct cor_conn *cn_l = cm->msg.ack_conn.src_in;
418 spin_lock_bh(&(cn_l->rcv_lock));
419 if (unlikely(cor_ackconn_prepare_requeue(cn_l,
420 cm) == 0)) {
421 cor_free_control_msg(cm);
422 } else {
423 cor_merge_or_enqueue_ackconn(cn_l, cm,
424 ADDCMSG_SRC_RETRANS);
427 spin_unlock_bh(&(cn_l->rcv_lock));
428 } else {
429 if (cm->type == MSGTYPE_PONG)
430 atomic_dec(&(cm->nb->cmsg_pongs_retrans_cnt));
431 cor_enqueue_control_msg(cm, ADDCMSG_SRC_RETRANS);
435 atomic_dec(&(cr->nb->cmsg_bulk_readds));
437 spin_lock_bh(&(cr->nb->cmsg_lock));
438 cor_schedule_controlmsg_timer(cr->nb);
439 spin_unlock_bh(&(cr->nb->cmsg_lock));
442 static void _cor_empty_retrans_queue(struct cor_neighbor *nb_retranslocked,
443 struct list_head *retrans_list)
445 while (!list_empty(retrans_list)) {
446 struct cor_control_retrans *cr = container_of(
447 retrans_list->next, struct cor_control_retrans,
448 timeout_list);
450 BUG_ON(cr->nb != nb_retranslocked);
452 list_del(&(cr->timeout_list));
453 rb_erase(&(cr->rbn), &(nb_retranslocked->kp_retransmits_rb));
455 kref_put(&(cr->ref), cor_kreffree_bug); /* rb */
456 kref_put(&(cr->ref), cor_free_control_retrans); /* list */
460 static void cor_empty_retrans_queue(struct cor_neighbor *nb_retranslocked)
462 _cor_empty_retrans_queue(nb_retranslocked,
463 &(nb_retranslocked->retrans_fast_list));
464 _cor_empty_retrans_queue(nb_retranslocked,
465 &(nb_retranslocked->retrans_slow_list));
468 static unsigned long cor_get_retransmit_timeout(
469 struct cor_neighbor *nb_retranslocked)
471 struct cor_control_retrans *cr1 = 0;
472 struct cor_control_retrans *cr2 = 0;
473 struct cor_control_retrans *cr = 0;
475 if (list_empty(&(nb_retranslocked->retrans_fast_list)) == 0) {
476 cr1 = container_of(nb_retranslocked->retrans_fast_list.next,
477 struct cor_control_retrans, timeout_list);
478 BUG_ON(cr1->nb != nb_retranslocked);
481 if (list_empty(&(nb_retranslocked->retrans_slow_list)) == 0) {
482 cr2 = container_of(nb_retranslocked->retrans_slow_list.next,
483 struct cor_control_retrans, timeout_list);
484 BUG_ON(cr2->nb != nb_retranslocked);
487 if (cr1 == 0)
488 cr = cr2;
489 else if (cr2 == 0)
490 cr = cr1;
491 else
492 cr = (time_after(cr1->timeout, cr2->timeout) ? cr2 : cr1);
494 BUG_ON(cr == 0);
496 return cr->timeout;
499 void cor_retransmit_timerfunc(struct timer_list *retrans_timer)
501 struct cor_neighbor *nb = container_of(retrans_timer,
502 struct cor_neighbor, retrans_timer);
503 int nbstate = cor_get_neigh_state(nb);
504 unsigned long timeout;
506 spin_lock_bh(&(nb->retrans_lock));
508 if (list_empty(&(nb->retrans_fast_list)) &&
509 list_empty(&(nb->retrans_slow_list))) {
510 spin_unlock_bh(&(nb->retrans_lock));
511 cor_nb_kref_put(nb, "retransmit_timer");
512 return;
515 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
516 cor_empty_retrans_queue(nb);
517 spin_unlock_bh(&(nb->retrans_lock));
518 cor_nb_kref_put(nb, "retransmit_timer");
519 return;
522 timeout = cor_get_retransmit_timeout(nb);
524 if (time_after(timeout, jiffies)) {
525 int rc = mod_timer(&(nb->retrans_timer), timeout);
526 spin_unlock_bh(&(nb->retrans_lock));
527 if (rc != 0)
528 cor_nb_kref_put(nb, "retransmit_timer");
529 return;
532 spin_unlock_bh(&(nb->retrans_lock));
534 spin_lock_bh(&(nb->cmsg_lock));
535 nb->add_retrans_needed = 1;
536 cor_schedule_controlmsg_timer(nb);
537 spin_unlock_bh(&(nb->cmsg_lock));
539 cor_nb_kref_put(nb, "retransmit_timer");
542 static void cor_schedule_retransmit(struct cor_control_retrans *cr,
543 struct cor_neighbor *nb, int fastack)
545 int first;
547 cr->timeout = cor_calc_timeout(atomic_read(&(nb->latency_retrans_us)),
548 atomic_read(&(nb->latency_stddev_retrans_us)),
549 fastack ?
550 atomic_read(&(nb->max_remote_ack_fast_delay_us)) :
551 atomic_read(&(nb->max_remote_ack_slow_delay_us)));
553 spin_lock_bh(&(nb->retrans_lock));
555 cor_insert_control_retrans(cr);
556 if (fastack) {
557 first = list_empty(&(nb->retrans_fast_list));
558 list_add_tail(&(cr->timeout_list), &(nb->retrans_fast_list));
559 } else {
560 first = list_empty(&(nb->retrans_slow_list));
561 list_add_tail(&(cr->timeout_list), &(nb->retrans_slow_list));
564 if (first) {
565 if (mod_timer(&(nb->retrans_timer),
566 cor_get_retransmit_timeout(nb)) == 0) {
567 cor_nb_kref_get(nb, "retransmit_timer");
571 spin_unlock_bh(&(nb->retrans_lock));
574 void cor_kern_ack_rcvd(struct cor_neighbor *nb, __u64 seqno)
576 struct cor_control_retrans *cr = 0;
578 spin_lock_bh(&(nb->retrans_lock));
580 cr = cor_get_control_retrans(nb, seqno);
582 if (cr == 0) {
583 /* char *seqno_p = (char *) &seqno;
584 seqno = cpu_to_be32(seqno);
585 printk(KERN_ERR "bogus/duplicate ack received %d %d %d %d\n",
586 seqno_p[0], seqno_p[1], seqno_p[2], seqno_p[3]);
588 goto out;
591 rb_erase(&(cr->rbn), &(nb->kp_retransmits_rb));
593 BUG_ON(cr->nb != nb);
595 list_del(&(cr->timeout_list));
597 out:
598 spin_unlock_bh(&(nb->retrans_lock));
600 if (cr != 0) {
601 /* cor_get_control_retrans */
602 kref_put(&(cr->ref), cor_kreffree_bug);
604 kref_put(&(cr->ref), cor_kreffree_bug); /* rb_erase */
605 kref_put(&(cr->ref), cor_free_control_retrans); /* list */
609 static __u16 cor_get_window(struct cor_conn *cn,
610 struct cor_neighbor *expectedsender, __u32 expected_connid)
612 __u16 window = 0;
614 BUG_ON(expectedsender == 0);
616 spin_lock_bh(&(cn->rcv_lock));
618 if (cor_is_conn_in(cn, expectedsender, expected_connid) == 0)
619 goto out;
621 window = cor_enc_window(cor_seqno_clean(
622 cn->source.in.window_seqnolimit -
623 cn->source.in.next_seqno));
625 cn->source.in.window_seqnolimit_remote = cn->source.in.next_seqno +
626 cor_dec_window(window);
628 out:
629 spin_unlock_bh(&(cn->rcv_lock));
631 return window;
634 /* static void padding(struct sk_buff *skb, __u32 length)
636 char *dst;
637 if (length <= 0)
638 return;
639 dst = skb_put(skb, length);
640 BUG_ON(dst == 0);
641 memset(dst, KP_PADDING, length);
642 } */
645 static __u32 cor_add_init_session(struct sk_buff *skb, __be32 sessionid,
646 __u32 spaceleft)
648 char *dst;
650 BUG_ON(KP_MISC_INIT_SESSION_CMDLEN != 5);
652 if (unlikely(spaceleft < 5))
653 return 0;
655 dst = skb_put(skb, 5);
656 BUG_ON(dst == 0);
658 dst[0] = get_kp_code(KP_MISC, KP_MISC_INIT_SESSION);
659 cor_put_be32(dst + 1, sessionid);
661 return 5;
664 static __u32 cor_add_ack(struct sk_buff *skb, struct cor_control_retrans *cr,
665 struct cor_control_msg_out *cm, __u32 spaceleft)
667 char *dst;
669 BUG_ON(cm->length != 7);
671 if (unlikely(spaceleft < 7))
672 return 0;
674 dst = skb_put(skb, 7);
675 BUG_ON(dst == 0);
677 dst[0] = get_kp_code(KP_MISC, KP_MISC_ACK);
678 cor_put_u48(dst + 1, cm->msg.ack.seqno);
680 list_add_tail(&(cm->lh), &(cr->msgs));
682 return 7;
685 static inline __u8 cor_add_ack_conn_get_delayremaining(
686 struct cor_control_msg_out *cm, unsigned long cmsg_send_start_j)
688 __u32 maxdelay_ms = 0;
689 unsigned long jiffies_timeout;
690 if (cm->msg.ack_conn.is_highlatency) {
691 maxdelay_ms = CMSG_MAXDELAY_ACKCONN_HIGHLATENCY_MS;
692 } else {
693 maxdelay_ms = CMSG_MAXDELAY_ACKCONN_LOWLATENCY_MS;
696 jiffies_timeout = cm->time_added + msecs_to_jiffies(maxdelay_ms);
698 if (time_before_eq(cmsg_send_start_j, cm->time_added)) {
699 return 255;
700 } else if (time_after_eq(cmsg_send_start_j, jiffies_timeout)) {
701 return 0;
702 } else {
703 __u64 delay_remaining = jiffies_timeout - cmsg_send_start_j;
705 BUG_ON(delay_remaining > U32_MAX);
706 BUG_ON(delay_remaining > msecs_to_jiffies(maxdelay_ms));
708 return (__u8) div64_u64(255 * delay_remaining +
709 msecs_to_jiffies(maxdelay_ms)/2,
710 msecs_to_jiffies(maxdelay_ms));
714 static __u32 cor_add_ack_conn(struct sk_buff *skb,
715 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
716 __u32 spaceleft, unsigned long cmsg_send_start_j,
717 int *ackneeded)
719 char *dst;
720 __u32 offset = 0;
722 if (unlikely(spaceleft < cm->length))
723 return 0;
725 dst = skb_put(skb, cm->length);
726 BUG_ON(dst == 0);
728 dst[offset] = get_kp_code(KP_ACK_CONN, cm->msg.ack_conn.flags);
729 offset++;
730 cor_put_u32(dst + offset, cm->msg.ack_conn.conn_id);
731 offset += 4;
733 if (likely((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) != 0 ||
734 cor_ooolen(cm->msg.ack_conn.flags) != 0)) {
735 dst[offset] = cor_add_ack_conn_get_delayremaining(cm,
736 cmsg_send_start_j);
737 offset++;
740 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) != 0) {
741 cor_put_u48(dst + offset, cm->msg.ack_conn.seqno);
742 offset += 6;
744 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_WINDOW) != 0) {
745 BUG_ON(cm->msg.ack_conn.src_in == 0);
746 cor_put_u16(dst + offset, cor_get_window(
747 cm->msg.ack_conn.src_in,
748 cm->nb, cor_get_connid_reverse(
749 cm->msg.ack_conn.conn_id)));
750 offset += 2;
751 dst[offset] = cm->msg.ack_conn.bufsize_changerate;
752 offset++;
756 if (cor_ooolen(cm->msg.ack_conn.flags) != 0) {
757 cor_put_u48(dst + offset, cm->msg.ack_conn.seqno_ooo);
758 offset += 6;
759 if (cor_ooolen(cm->msg.ack_conn.flags) == 1) {
760 BUG_ON(cm->msg.ack_conn.length > 255);
761 dst[offset] = cm->msg.ack_conn.length;
762 offset += 1;
763 } else if (cor_ooolen(cm->msg.ack_conn.flags) == 2) {
764 BUG_ON(cm->msg.ack_conn.length <= 255);
765 BUG_ON(cm->msg.ack_conn.length > 65535);
766 cor_put_u16(dst + offset, cm->msg.ack_conn.length);
767 offset += 2;
768 } else if (cor_ooolen(cm->msg.ack_conn.flags) == 4) {
769 BUG_ON(cm->msg.ack_conn.length <= 65535);
770 cor_put_u32(dst + offset, cm->msg.ack_conn.length);
771 offset += 4;
772 } else {
773 BUG();
777 if (unlikely((cm->msg.ack_conn.flags &
778 KP_ACK_CONN_FLAGS_PRIORITY) != 0)) {
779 __u16 priority = (cm->msg.ack_conn.priority_seqno << 12) &
780 cm->msg.ack_conn.priority;
781 BUG_ON(cm->msg.ack_conn.priority_seqno > 15);
782 BUG_ON(cm->msg.ack_conn.priority > 4095);
784 cor_put_u16(dst + offset, priority);
785 offset+=2;
788 list_add_tail(&(cm->lh), &(cr->msgs));
789 if (likely((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) != 0 ||
790 cor_ooolen(cm->msg.ack_conn.flags) != 0) &&
791 cm->msg.ack_conn.is_highlatency == 0) {
792 *ackneeded = ACK_NEEDED_FAST;
793 } else if (*ackneeded != ACK_NEEDED_FAST) {
794 *ackneeded = ACK_NEEDED_SLOW;
797 BUG_ON(offset != cm->length);
798 return offset;
801 static __u32 cor_add_ping(struct sk_buff *skb, __u32 cookie, __u32 spaceleft)
803 char *dst;
805 BUG_ON(KP_MISC_PING_CMDLEN != 5);
807 if (unlikely(spaceleft < 5))
808 return 0;
810 dst = skb_put(skb, 5);
811 BUG_ON(dst == 0);
813 dst[0] = get_kp_code(KP_MISC, KP_MISC_PING);
814 cor_put_u32(dst + 1, cookie);
816 return 5;
819 static __u32 cor_calc_respdelay(ktime_t time_pong_enqueued, ktime_t time_end)
821 if (unlikely(ktime_before(time_end, time_pong_enqueued))) {
822 return 0;
823 } else {
824 __s64 respdelay = div_u64(ktime_to_ns(time_end) -
825 ktime_to_ns(time_pong_enqueued) + 500,
826 1000);
828 if (unlikely(respdelay > U32_MAX))
829 return U32_MAX;
830 else if (unlikely(respdelay < 0))
831 return 0;
832 else
833 return (__u32) respdelay;
837 static __u32 cor_add_pong(struct sk_buff *skb, struct cor_control_retrans *cr,
838 struct cor_control_msg_out *cm, __u32 spaceleft, int nbstate,
839 ktime_t cmsg_send_start, int *ackneeded)
841 __u32 respdelay_full;
842 __u32 respdelay_netonly;
843 char *dst;
845 BUG_ON(cm->length != 13);
847 if (unlikely(spaceleft < 13))
848 return 0;
850 respdelay_full = cor_calc_respdelay(cm->msg.pong.time_enqueued,
851 cmsg_send_start);
852 respdelay_netonly = cor_calc_respdelay(cm->msg.pong.ping_rcvtime,
853 ktime_get());
855 dst = skb_put(skb, 13);
856 BUG_ON(dst == 0);
858 dst[0] = get_kp_code(KP_MISC, KP_MISC_PONG);
859 cor_put_u32(dst + 1, cm->msg.pong.cookie);
860 cor_put_u32(dst + 5, (__u32) respdelay_full);
861 cor_put_u32(dst + 9, (__u32) respdelay_netonly);
863 list_add_tail(&(cm->lh), &(cr->msgs));
864 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE) &&
865 *ackneeded != ACK_NEEDED_FAST)
866 *ackneeded = ACK_NEEDED_SLOW;
868 return 13;
871 static __u32 cor_add_connect(struct sk_buff *skb,
872 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
873 __u32 spaceleft, int *ackneeded)
875 char *dst;
876 struct cor_conn *src_in = cm->msg.connect.src_in;
877 struct cor_conn *trgt_out = cor_get_conn_reversedir(src_in);
878 __u16 priority;
880 BUG_ON(cm->length != 22);
882 if (unlikely(spaceleft < 22))
883 return 0;
885 dst = skb_put(skb, 22);
886 BUG_ON(dst == 0);
888 dst[0] = get_kp_code(KP_MISC, KP_MISC_CONNECT);
889 cor_put_u32(dst + 1, cm->msg.connect.conn_id);
890 cor_put_u48(dst + 5, cm->msg.connect.seqno1);
891 cor_put_u48(dst + 11, cm->msg.connect.seqno2);
892 BUG_ON(cm->msg.connect.src_in == 0);
893 cor_put_u16(dst + 17, cor_get_window(cm->msg.connect.src_in, cm->nb,
894 cor_get_connid_reverse(cm->msg.connect.conn_id)));
896 spin_lock_bh(&(trgt_out->rcv_lock));
897 BUG_ON(trgt_out->targettype != TARGET_OUT);
899 priority = (trgt_out->target.out.priority_seqno << 12) &
900 trgt_out->target.out.priority_last;
901 BUG_ON(trgt_out->target.out.priority_seqno > 15);
902 BUG_ON(trgt_out->target.out.priority_last > 4095);
903 cor_put_u16(dst + 19, priority);
905 if (src_in->is_highlatency == 0)
906 dst[21] = 0;
907 else
908 dst[21] = 1;
910 spin_unlock_bh(&(trgt_out->rcv_lock));
912 list_add_tail(&(cm->lh), &(cr->msgs));
913 if (*ackneeded != ACK_NEEDED_FAST)
914 *ackneeded = ACK_NEEDED_SLOW;
916 return 22;
919 static __u32 cor_add_connect_success(struct sk_buff *skb,
920 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
921 __u32 spaceleft, int *ackneeded)
923 char *dst;
925 BUG_ON(cm->length != 7);
927 if (unlikely(spaceleft < 7))
928 return 0;
930 dst = skb_put(skb, 7);
931 BUG_ON(dst == 0);
933 dst[0] = get_kp_code(KP_MISC, KP_MISC_CONNECT_SUCCESS);
934 cor_put_u32(dst + 1, cm->msg.connect_success.conn_id);
935 BUG_ON(cm->msg.connect_success.src_in == 0);
936 cor_put_u16(dst + 5, cor_get_window(
937 cm->msg.connect_success.src_in, cm->nb,
938 cor_get_connid_reverse(
939 cm->msg.connect_success.conn_id)));
941 list_add_tail(&(cm->lh), &(cr->msgs));
942 if (*ackneeded != ACK_NEEDED_FAST)
943 *ackneeded = ACK_NEEDED_SLOW;
945 return 7;
948 static __u32 cor_add_reset_conn(struct sk_buff *skb,
949 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
950 __u32 spaceleft, int *ackneeded)
952 char *dst;
954 BUG_ON(cm->length != 5);
956 if (unlikely(spaceleft < 5))
957 return 0;
959 dst = skb_put(skb, 5);
960 BUG_ON(dst == 0);
962 dst[0] = get_kp_code(KP_MISC, KP_MISC_RESET_CONN);
963 cor_put_u32(dst + 1, cm->msg.reset_conn.conn_id);
965 list_add_tail(&(cm->lh), &(cr->msgs));
966 if (*ackneeded != ACK_NEEDED_FAST)
967 *ackneeded = ACK_NEEDED_SLOW;
969 return 5;
972 static __u32 cor_add_conndata(struct sk_buff *skb,
973 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
974 __u32 spaceleft, struct cor_control_msg_out **split_conndata,
975 __u32 *sc_sendlen)
977 char *dst;
978 __u32 offset = 0;
980 __u32 totallen = get_kp_conn_data_length(cm->msg.conn_data.datalen);
981 __u32 putlen = totallen;
982 __u32 dataputlen = cm->msg.conn_data.datalen;
983 __u8 code_min = 0;
985 BUILD_BUG_ON(KP_CONN_DATA_MAXLEN != 128+32767);
986 BUG_ON(cm->msg.conn_data.datalen > KP_CONN_DATA_MAXLEN);
988 BUG_ON(cm->length != totallen);
990 BUG_ON(putlen > 1024*1024*1024);
992 BUG_ON(split_conndata == 0);
993 BUG_ON(*split_conndata != 0);
994 BUG_ON(sc_sendlen == 0);
995 BUG_ON(*sc_sendlen != 0);
997 if (putlen > spaceleft) {
998 if (spaceleft < get_kp_conn_data_length(1))
999 return 0;
1001 BUG_ON(spaceleft < 13);
1003 if (spaceleft <= 127 + 12) {
1004 dataputlen = spaceleft - 12;
1005 putlen = spaceleft;
1006 } else if (spaceleft == 127 - 12 + 1) {
1007 dataputlen = spaceleft - 12 - 1;
1008 putlen = spaceleft - 1;
1009 } else {
1010 dataputlen = spaceleft - 13;
1011 putlen = spaceleft;
1014 BUG_ON(putlen != get_kp_conn_data_length(dataputlen));
1017 dst = skb_put(skb, putlen);
1018 BUG_ON(dst == 0);
1020 BUG_ON((cm->msg.conn_data.windowused &
1021 (~KP_CONN_DATA_FLAGS_WINDOWUSED)) != 0);
1022 code_min = 0;
1023 if (cm->msg.conn_data.flush != 0)
1024 code_min |= KP_CONN_DATA_FLAGS_FLUSH;
1025 code_min |= cm->msg.conn_data.windowused;
1027 dst[0] = get_kp_code(KP_CONN_DATA, code_min);
1028 offset++;
1029 cor_put_u32(dst + offset, cm->msg.conn_data.conn_id);
1030 offset += 4;
1031 cor_put_u48(dst + offset, cm->msg.conn_data.seqno);
1032 offset += 6;
1034 if (dataputlen < 128) {
1035 dst[offset] = (__u8) dataputlen;
1036 offset++;
1037 } else {
1038 __u8 high = (__u8) (128 + ((dataputlen - 128) / 256));
1039 __u8 low = (__u8) ((dataputlen - 128) % 256);
1040 BUG_ON(((dataputlen - 128) / 256) > 127);
1041 dst[offset] = high;
1042 dst[offset+1] = low;
1043 offset += 2;
1046 BUG_ON(offset > putlen);
1047 BUG_ON(putlen - offset != dataputlen);
1048 memcpy(dst + offset, cm->msg.conn_data.data, dataputlen);
1049 offset += dataputlen;
1051 if (cm->msg.conn_data.datalen == dataputlen) {
1052 BUG_ON(cm->length != putlen);
1053 list_add_tail(&(cm->lh), &(cr->msgs));
1054 } else {
1055 *split_conndata = cm;
1056 *sc_sendlen = dataputlen;
1059 return putlen;
1062 static __u32 cor_add_set_max_cmsg_dly(struct sk_buff *skb,
1063 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
1064 __u32 spaceleft, int *ackneeded)
1066 char *dst;
1068 BUG_ON(KP_MISC_SET_MAX_CMSG_DELAY_CMDLEN != 21);
1069 BUG_ON(cm->length != KP_MISC_SET_MAX_CMSG_DELAY_CMDLEN);
1071 if (unlikely(spaceleft < 21))
1072 return 0;
1074 dst = skb_put(skb, 21);
1075 BUG_ON(dst == 0);
1077 dst[0] = get_kp_code(KP_MISC, KP_MISC_SET_MAX_CMSG_DELAY);
1078 cor_put_u32(dst + 1, cm->msg.set_max_cmsg_delay.ack_fast_delay);
1079 cor_put_u32(dst + 5, cm->msg.set_max_cmsg_delay.ack_slow_delay);
1080 cor_put_u32(dst + 9,
1081 cm->msg.set_max_cmsg_delay.ackconn_lowlatency_delay);
1082 cor_put_u32(dst + 13,
1083 cm->msg.set_max_cmsg_delay.ackconn_highlatency_delay);
1084 cor_put_u32(dst + 17, cm->msg.set_max_cmsg_delay.pong_delay);
1086 list_add_tail(&(cm->lh), &(cr->msgs));
1087 if (*ackneeded != ACK_NEEDED_FAST)
1088 *ackneeded = ACK_NEEDED_SLOW;
1090 return 21;
1093 static __u32 cor_add_message(struct sk_buff *skb,
1094 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
1095 __u32 spaceleft, int nbstate, unsigned long cmsg_send_start_j,
1096 ktime_t cmsg_send_start_kt,
1097 struct cor_control_msg_out **split_conndata, __u32 *sc_sendlen,
1098 int *ackneeded)
1100 BUG_ON(split_conndata != 0 && *split_conndata != 0);
1101 BUG_ON(sc_sendlen != 0 && *sc_sendlen != 0);
1103 switch (cm->type) {
1104 case MSGTYPE_ACK:
1105 return cor_add_ack(skb, cr, cm, spaceleft);
1106 case MSGTYPE_ACK_CONN:
1107 return cor_add_ack_conn(skb, cr, cm, spaceleft,
1108 cmsg_send_start_j, ackneeded);
1109 case MSGTYPE_PONG:
1110 return cor_add_pong(skb, cr, cm, spaceleft, nbstate,
1111 cmsg_send_start_kt, ackneeded);
1112 case MSGTYPE_CONNECT:
1113 return cor_add_connect(skb, cr, cm, spaceleft, ackneeded);
1114 case MSGTYPE_CONNECT_SUCCESS:
1115 return cor_add_connect_success(skb, cr, cm, spaceleft,
1116 ackneeded);
1117 case MSGTYPE_RESET_CONN:
1118 return cor_add_reset_conn(skb, cr, cm, spaceleft, ackneeded);
1119 case MSGTYPE_CONNDATA:
1120 return cor_add_conndata(skb, cr, cm, spaceleft, split_conndata,
1121 sc_sendlen);
1122 case MSGTYPE_SET_MAX_CMSG_DELAY:
1123 return cor_add_set_max_cmsg_dly(skb, cr, cm, spaceleft,
1124 ackneeded);
1125 default:
1126 BUG();
1128 BUG();
1129 return 0;
1132 static __u32 ___cor_send_messages(struct cor_neighbor *nb, struct sk_buff *skb,
1133 struct cor_control_retrans *cr, struct list_head *cmsgs,
1134 __u32 spaceleft, int nbstate, unsigned long cmsg_send_start_j,
1135 ktime_t cmsg_send_start_kt,
1136 struct cor_control_msg_out **split_conndata, __u32 *sc_sendlen,
1137 int *ackneeded)
1139 __u32 length = 0;
1140 while (!list_empty(cmsgs)) {
1141 __u32 rc;
1142 struct cor_control_msg_out *cm = container_of(cmsgs->next,
1143 struct cor_control_msg_out, lh);
1145 list_del(&(cm->lh));
1147 rc = cor_add_message(skb, cr, cm, spaceleft - length, nbstate,
1148 cmsg_send_start_j, cmsg_send_start_kt,
1149 split_conndata, sc_sendlen, ackneeded);
1150 if (rc == 0) {
1151 BUG();
1152 list_add(&(cm->lh), cmsgs);
1153 break;
1156 BUG_ON(rc != cm->length && cm->type != MSGTYPE_CONNDATA);
1158 length += rc;
1161 return length;
1164 static __u32 ___cor_send_messages_smcd(struct cor_neighbor *nb,
1165 struct sk_buff *skb, struct cor_control_retrans *cr,
1166 __u32 spaceleft, int nbstate, unsigned long cmsg_send_start_j,
1167 ktime_t cmsg_send_start_kt, int *ackneeded)
1169 struct cor_control_msg_out *cm;
1170 __u32 rc;
1172 cm = cor_alloc_control_msg(nb, ACM_PRIORITY_MED);
1174 if (unlikely(cm == 0))
1175 return 0;
1177 cm->type = MSGTYPE_SET_MAX_CMSG_DELAY;
1178 cm->msg.set_max_cmsg_delay.ack_fast_delay =
1179 CMSG_MAXDELAY_ACK_FAST_MS * 1000;
1180 cm->msg.set_max_cmsg_delay.ack_slow_delay =
1181 CMSG_MAXDELAY_ACK_SLOW_MS * 1000;
1182 cm->msg.set_max_cmsg_delay.ackconn_lowlatency_delay =
1183 CMSG_MAXDELAY_ACKCONN_LOWLATENCY_MS * 1000;
1184 cm->msg.set_max_cmsg_delay.ackconn_highlatency_delay =
1185 CMSG_MAXDELAY_ACKCONN_HIGHLATENCY_MS * 1000;
1186 cm->msg.set_max_cmsg_delay.pong_delay =
1187 CMSG_MAXDELAY_OTHER_MS * 1000;
1188 cm->length = KP_MISC_SET_MAX_CMSG_DELAY_CMDLEN;
1190 rc = cor_add_message(skb, cr, cm, spaceleft, nbstate, cmsg_send_start_j,
1191 cmsg_send_start_kt, 0, 0, ackneeded);
1193 nb->max_cmsg_delay_sent = 1;
1195 return rc;
1198 #define CMSGQUEUE_PONG 1
1199 #define CMSGQUEUE_ACK_FAST 2
1200 #define CMSGQUEUE_ACK_SLOW 3
1201 #define CMSGQUEUE_ACK_CONN_URGENT 4
1202 #define CMSGQUEUE_ACK_CONN_LOWLAT 5
1203 #define CMSGQUEUE_ACK_CONN_HIGHLAT 6
1204 #define CMSGQUEUE_CONNDATA_LOWLAT 7
1205 #define CMSGQUEUE_CONNDATA_HIGHLAT 8
1206 #define CMSGQUEUE_OTHER 9
1208 static void cor_requeue_message(struct cor_control_msg_out *cm)
1210 if (cm->type == MSGTYPE_ACK_CONN) {
1211 struct cor_conn *cn_l = cm->msg.ack_conn.src_in;
1213 spin_lock_bh(&(cn_l->rcv_lock));
1214 if (unlikely(cor_ackconn_prepare_requeue(cn_l, cm) == 0)) {
1215 cor_free_control_msg(cm);
1216 } else {
1217 spin_lock_bh(&(cm->nb->cmsg_lock));
1219 if (unlikely(cm->msg.ack_conn.queue ==
1220 CMSGQUEUE_ACK_CONN_URGENT)) {
1221 list_add(&(cm->lh), &(cm->nb->
1222 cmsg_queue_ackconn_urgent));
1223 } else if (cm->msg.ack_conn.queue ==
1224 CMSGQUEUE_ACK_CONN_LOWLAT) {
1225 list_add(&(cm->lh), &(cm->nb->
1226 cmsg_queue_ackconn_lowlat));
1227 } else if (cm->msg.ack_conn.queue ==
1228 CMSGQUEUE_ACK_CONN_HIGHLAT) {
1229 list_add(&(cm->lh), &(cm->nb->
1230 cmsg_queue_ackconn_highlat));
1231 } else {
1232 BUG();
1235 cm->nb->cmsg_otherlength += cm->length;
1237 list_add(&(cm->msg.ack_conn.conn_acks),
1238 &(cn_l->source.in.acks_pending));
1239 cor_try_merge_ackconns(cn_l, cm);
1241 spin_unlock_bh(&(cm->nb->cmsg_lock));
1243 spin_unlock_bh(&(cn_l->rcv_lock));
1244 return;
1247 cor_enqueue_control_msg(cm, ADDCMSG_SRC_READD);
1250 static void cor_requeue_messages(struct list_head *lh)
1252 while (list_empty(lh) == 0) {
1253 struct cor_control_msg_out *cm = container_of(lh->prev,
1254 struct cor_control_msg_out, lh);
1255 list_del(&(cm->lh));
1256 cor_requeue_message(cm);
1260 static int __cor_send_messages_send(struct cor_neighbor *nb,
1261 struct sk_buff *skb, char *packet_type, int ping,
1262 int initsession, struct cor_control_retrans *cr,
1263 struct list_head *cmsgs, __u32 spaceleft, int nbstate,
1264 unsigned long cmsg_send_start_j, ktime_t cmsg_send_start_kt,
1265 int *sent)
1267 int rc;
1268 int ackneeded = ACK_NEEDED_NO;
1269 __u32 length = 0;
1270 __u32 pinglen = 0;
1271 __u32 pingcookie = 0;
1272 unsigned long last_ping_time;
1273 struct cor_control_msg_out *split_conndata = 0;
1274 __u32 sc_sendlen = 0;
1276 if (ping != TIMETOSENDPING_NO) {
1277 __u32 rc;
1279 if (unlikely(initsession)) {
1280 rc = cor_add_init_session(skb, nb->sessionid,
1281 spaceleft - length);
1282 BUG_ON(rc <= 0);
1283 pinglen = rc;
1284 length += rc;
1287 pingcookie = cor_add_ping_req(nb, &last_ping_time);
1288 rc = cor_add_ping(skb, pingcookie, spaceleft - length);
1289 BUG_ON(rc <= 0);
1290 pinglen += rc;
1291 length += rc;
1294 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE) &&
1295 unlikely(nb->max_cmsg_delay_sent == 0))
1296 length += ___cor_send_messages_smcd(nb, skb, cr,
1297 spaceleft - length, nbstate, cmsg_send_start_j,
1298 cmsg_send_start_kt, &ackneeded);
1300 length += ___cor_send_messages(nb, skb, cr, cmsgs, spaceleft - length,
1301 nbstate, cmsg_send_start_j, cmsg_send_start_kt,
1302 &split_conndata, &sc_sendlen, &ackneeded);
1304 BUG_ON(length > spaceleft);
1306 if (likely(ping != TIMETOSENDPING_FORCE) &&
1307 pinglen != 0 && unlikely(length == pinglen)) {
1308 cor_unadd_ping_req(nb, pingcookie, last_ping_time, 0);
1309 goto drop;
1312 if (unlikely(length == 0)) {
1313 drop:
1314 kfree_skb(skb);
1316 BUG_ON(list_empty(&(cr->msgs)) == 0);
1317 kref_put(&(cr->ref), cor_free_control_retrans);
1319 nb->kpacket_seqno--;
1320 return QOS_RESUME_DONE;
1323 //padding(skb, spaceleft - length);
1324 BUG_ON(spaceleft - length != 0 &&
1325 (split_conndata == 0 || spaceleft - length != 1));
1327 if (ackneeded == ACK_NEEDED_NO) {
1328 *packet_type = PACKET_TYPE_CMSG_NOACK;
1329 } else if (ackneeded == ACK_NEEDED_SLOW) {
1330 *packet_type = PACKET_TYPE_CMSG_ACKSLOW;
1331 } else if (ackneeded == ACK_NEEDED_FAST) {
1332 *packet_type = PACKET_TYPE_CMSG_ACKFAST;
1333 } else {
1334 BUG();
1337 rc = cor_dev_queue_xmit(skb, nb->queue, QOS_CALLER_KPACKET);
1338 if (rc == NET_XMIT_SUCCESS)
1339 *sent = 1;
1341 if (rc == NET_XMIT_DROP) {
1342 if (ping != 0)
1343 cor_unadd_ping_req(nb, pingcookie, last_ping_time, 1);
1345 atomic_inc(&(nb->cmsg_bulk_readds));
1346 if (split_conndata != 0)
1347 cor_requeue_message(split_conndata);
1349 cor_requeue_messages(&(cr->msgs));
1351 kref_put(&(cr->ref), cor_free_control_retrans);
1353 atomic_dec(&(nb->cmsg_bulk_readds));
1355 spin_lock_bh(&(nb->cmsg_lock));
1356 cor_schedule_controlmsg_timer(nb);
1357 spin_unlock_bh(&(nb->cmsg_lock));
1358 } else {
1359 struct list_head *curr = cr->msgs.next;
1361 if (pingcookie != 0)
1362 cor_ping_sent(nb, pingcookie);
1364 while (curr != &(cr->msgs)) {
1365 struct cor_control_msg_out *cm = container_of(curr,
1366 struct cor_control_msg_out, lh);
1368 curr = curr->next;
1370 if (cm->type == MSGTYPE_ACK || unlikely(
1371 cm->type == MSGTYPE_PONG &&
1372 (nbstate != NEIGHBOR_STATE_ACTIVE))) {
1373 list_del(&(cm->lh));
1374 cor_free_control_msg(cm);
1375 } else if (unlikely(cm->type == MSGTYPE_PONG &&
1376 atomic_inc_return(
1377 &(nb->cmsg_pongs_retrans_cnt)) >
1378 MAX_PONG_CMSGS_RETRANS_PER_NEIGH)) {
1379 atomic_dec(&(nb->cmsg_pongs_retrans_cnt));
1380 list_del(&(cm->lh));
1381 cor_free_control_msg(cm);
1382 } else if (cm->type == MSGTYPE_CONNDATA) {
1383 cor_schedule_retransmit_conn(
1384 cm->msg.conn_data.cr, 0, 0);
1385 kref_put(&(cm->msg.conn_data.cr->ref),
1386 cor_free_connretrans);
1387 cm->msg.conn_data.cr = 0;
1388 kfree(cm->msg.conn_data.data_orig);
1389 list_del(&(cm->lh));
1390 cor_free_control_msg(cm);
1394 if (split_conndata != 0) {
1395 BUG_ON(sc_sendlen == 0);
1396 BUG_ON(sc_sendlen >=
1397 split_conndata->msg.conn_data.datalen);
1399 split_conndata->msg.conn_data.seqno += sc_sendlen;
1400 split_conndata->msg.conn_data.data += sc_sendlen;
1401 split_conndata->msg.conn_data.datalen -= sc_sendlen;
1402 split_conndata->length = get_kp_conn_data_length(
1403 split_conndata->msg.conn_data.datalen);
1404 cor_enqueue_control_msg(split_conndata,
1405 ADDCMSG_SRC_SPLITCONNDATA);
1409 if (list_empty(&(cr->msgs))) {
1410 kref_put(&(cr->ref), cor_free_control_retrans);
1411 } else {
1412 int fastack = (ackneeded == ACK_NEEDED_FAST);
1413 BUG_ON(ackneeded != ACK_NEEDED_FAST &&
1414 ackneeded != ACK_NEEDED_SLOW);
1415 cor_schedule_retransmit(cr, nb, fastack);
1419 return (rc == NET_XMIT_SUCCESS) ? QOS_RESUME_DONE : QOS_RESUME_CONG;
1422 static int _cor_send_messages_send(struct cor_neighbor *nb, int ping,
1423 int initsession, struct list_head *cmsgs, int nbstate,
1424 __u32 length, __u64 seqno, unsigned long cmsg_send_start_j,
1425 ktime_t cmsg_send_start_kt, int *sent)
1427 struct sk_buff *skb;
1428 struct cor_control_retrans *cr;
1429 char *dst;
1430 int rc;
1432 BUG_ON(length > cor_mss_cmsg(nb));
1433 skb = cor_create_packet(nb, length + 7, GFP_ATOMIC);
1434 if (unlikely(skb == 0)) {
1435 printk(KERN_ERR "cor_send_messages(): cannot allocate skb (out of memory?)\n");
1437 cor_requeue_messages(cmsgs);
1438 return QOS_RESUME_CONG;
1441 cr = kmem_cache_alloc(cor_controlretrans_slab, GFP_ATOMIC);
1442 if (unlikely(cr == 0)) {
1443 printk(KERN_ERR "cor_send_messages(): cannot allocate cor_control_retrans (out of memory?)\n");
1444 kfree_skb(skb);
1446 cor_requeue_messages(cmsgs);
1447 return QOS_RESUME_CONG;
1450 memset(cr, 0, sizeof(struct cor_control_retrans));
1451 kref_init(&(cr->ref));
1452 cr->nb = nb;
1453 cr->seqno = seqno;
1454 INIT_LIST_HEAD(&(cr->msgs));
1457 dst = skb_put(skb, 7);
1458 BUG_ON(dst == 0);
1460 dst[0] = PACKET_TYPE_NONE;
1461 cor_put_u48(dst + 1, seqno);
1463 rc = __cor_send_messages_send(nb, skb, &(dst[0]), ping, initsession, cr,
1464 cmsgs, length, nbstate, cmsg_send_start_j,
1465 cmsg_send_start_kt, sent);
1467 BUG_ON(!list_empty(cmsgs));
1469 return rc;
1472 static unsigned long cor_get_cmsg_timeout(struct cor_control_msg_out *cm,
1473 int queue)
1475 if (cm->type == MSGTYPE_ACK) {
1476 if (cm->msg.ack.fast != 0) {
1477 BUG_ON(queue != CMSGQUEUE_ACK_FAST);
1478 return cm->time_added + msecs_to_jiffies(
1479 CMSG_MAXDELAY_ACK_FAST_MS);
1480 } else {
1481 BUG_ON(queue != CMSGQUEUE_ACK_SLOW);
1482 return cm->time_added + msecs_to_jiffies(
1483 CMSG_MAXDELAY_ACK_SLOW_MS);
1485 } else if (cm->type == MSGTYPE_ACK_CONN) {
1486 __u32 maxdelay_ms = 0;
1487 if (unlikely(queue == CMSGQUEUE_ACK_CONN_URGENT)) {
1488 maxdelay_ms = CMSG_MAXDELAY_ACKCONN_URGENT_MS;
1489 } else if (queue == CMSGQUEUE_ACK_CONN_LOWLAT) {
1490 maxdelay_ms = CMSG_MAXDELAY_ACKCONN_LOWLATENCY_MS;
1491 } else if (queue == CMSGQUEUE_ACK_CONN_HIGHLAT) {
1492 maxdelay_ms = CMSG_MAXDELAY_ACKCONN_HIGHLATENCY_MS;
1493 } else {
1494 BUG();
1496 return cm->time_added + msecs_to_jiffies(maxdelay_ms);
1497 } else if (cm->type == MSGTYPE_CONNDATA) {
1498 if (cm->msg.conn_data.highlatency != 0) {
1499 BUG_ON(queue != CMSGQUEUE_CONNDATA_HIGHLAT);
1500 return cm->time_added +
1501 msecs_to_jiffies(
1502 CMSG_MAXDELAY_CONNDATA_MS);
1503 } else {
1504 BUG_ON(queue != CMSGQUEUE_CONNDATA_LOWLAT);
1505 return cm->time_added;
1507 } else {
1508 BUG_ON(cm->type == MSGTYPE_PONG && queue != CMSGQUEUE_PONG);
1509 BUG_ON(cm->type != MSGTYPE_PONG && queue != CMSGQUEUE_OTHER);
1511 return cm->time_added +
1512 msecs_to_jiffies(CMSG_MAXDELAY_OTHER_MS);
1516 static void _cor_peek_message(struct cor_neighbor *nb_cmsglocked, int queue,
1517 struct cor_control_msg_out **currcm, unsigned long *currtimeout,
1518 __u32 **currlen)
1520 struct cor_control_msg_out *cm;
1521 unsigned long cmtimeout;
1523 struct list_head *queuelh;
1524 if (queue == CMSGQUEUE_PONG) {
1525 queuelh = &(nb_cmsglocked->cmsg_queue_pong);
1526 } else if (queue == CMSGQUEUE_ACK_FAST) {
1527 queuelh = &(nb_cmsglocked->cmsg_queue_ack_fast);
1528 } else if (queue == CMSGQUEUE_ACK_SLOW) {
1529 queuelh = &(nb_cmsglocked->cmsg_queue_ack_slow);
1530 } else if (queue == CMSGQUEUE_ACK_CONN_URGENT) {
1531 queuelh = &(nb_cmsglocked->cmsg_queue_ackconn_urgent);
1532 } else if (queue == CMSGQUEUE_ACK_CONN_LOWLAT) {
1533 queuelh = &(nb_cmsglocked->cmsg_queue_ackconn_lowlat);
1534 } else if (queue == CMSGQUEUE_ACK_CONN_HIGHLAT) {
1535 queuelh = &(nb_cmsglocked->cmsg_queue_ackconn_highlat);
1536 } else if (queue == CMSGQUEUE_CONNDATA_LOWLAT) {
1537 queuelh = &(nb_cmsglocked->cmsg_queue_conndata_lowlat);
1538 } else if (queue == CMSGQUEUE_CONNDATA_HIGHLAT) {
1539 queuelh = &(nb_cmsglocked->cmsg_queue_conndata_highlat);
1540 } else if (queue == CMSGQUEUE_OTHER) {
1541 queuelh = &(nb_cmsglocked->cmsg_queue_other);
1542 } else {
1543 BUG();
1546 if (list_empty(queuelh))
1547 return;
1549 cm = container_of(queuelh->next, struct cor_control_msg_out, lh);
1550 cmtimeout = cor_get_cmsg_timeout(cm, queue);
1552 BUG_ON(cm->nb != nb_cmsglocked);
1554 if (*currcm == 0 || (time_before(cmtimeout, *currtimeout) &&
1555 time_before(jiffies, *currtimeout))) {
1556 *currcm = cm;
1557 *currtimeout = cmtimeout;
1559 if (queue == CMSGQUEUE_PONG) {
1560 *currlen = &(nb_cmsglocked->cmsg_pongslength);
1561 } else {
1562 *currlen = &(nb_cmsglocked->cmsg_otherlength);
1567 static void cor_peek_message(struct cor_neighbor *nb_cmsglocked, int nbstate,
1568 struct cor_control_msg_out **cm, unsigned long *cmtimeout,
1569 __u32 **len, int for_timeout)
1571 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_PONG, cm, cmtimeout, len);
1572 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE)) {
1573 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK_FAST, cm,
1574 cmtimeout, len);
1575 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK_SLOW, cm,
1576 cmtimeout, len);
1577 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK_CONN_URGENT, cm,
1578 cmtimeout, len);
1579 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK_CONN_LOWLAT, cm,
1580 cmtimeout, len);
1581 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK_CONN_HIGHLAT, cm,
1582 cmtimeout, len);
1583 if (!for_timeout || atomic_read(
1584 &(nb_cmsglocked->cmsg_delay_conndata)) == 0) {
1585 _cor_peek_message(nb_cmsglocked,
1586 CMSGQUEUE_CONNDATA_LOWLAT,
1587 cm, cmtimeout, len);
1588 _cor_peek_message(nb_cmsglocked,
1589 CMSGQUEUE_CONNDATA_HIGHLAT,
1590 cm, cmtimeout, len);
1592 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_OTHER, cm, cmtimeout,
1593 len);
1597 static unsigned long cor_get_cmsg_timer_timeout(
1598 struct cor_neighbor *nb_cmsglocked, int nbstate)
1600 unsigned long pingtimeout = cor_get_next_ping_time(nb_cmsglocked);
1602 struct cor_control_msg_out *cm = 0;
1603 unsigned long cmtimeout;
1604 __u32 *len;
1606 cor_peek_message(nb_cmsglocked, nbstate, &cm, &cmtimeout, &len, 1);
1608 if (cm != 0) {
1609 unsigned long jiffies_tmp = jiffies;
1611 if (time_before(cmtimeout, jiffies_tmp))
1612 return jiffies_tmp;
1613 if (time_before(cmtimeout, pingtimeout))
1614 return cmtimeout;
1617 return pingtimeout;
1620 static void _cor_dequeue_messages(struct cor_neighbor *nb_cmsglocked,
1621 int nbstate, __u32 targetmss, __u32 *length,
1622 struct list_head *cmsgs)
1624 while (1) {
1625 __u32 spaceleft = targetmss - *length;
1626 struct cor_control_msg_out *cm = 0;
1627 unsigned long cmtimeout;
1628 __u32 *len;
1630 cor_peek_message(nb_cmsglocked, nbstate, &cm, &cmtimeout, &len,
1633 if (unlikely(cm == 0))
1634 break;
1636 BUG_ON(len == 0);
1638 if (cm->length > spaceleft) {
1639 if (cm->type == MSGTYPE_CONNDATA) {
1640 BUG_ON(*length == 0 && spaceleft <
1641 get_kp_conn_data_length(1));
1643 if (spaceleft < get_kp_conn_data_length(1) ||
1644 *length > (targetmss/4)*3)
1645 break;
1646 } else {
1647 BUG_ON(*length == 0);
1648 break;
1652 list_del(&(cm->lh));
1653 *len -= cm->length;
1655 if (cm->type == MSGTYPE_ACK_CONN)
1656 list_del(&(cm->msg.ack_conn.conn_acks));
1657 if (unlikely(cm->type == MSGTYPE_PONG)) {
1658 BUG_ON(cm->nb->cmsg_pongscnt == 0);
1659 cm->nb->cmsg_pongscnt--;
1662 if (unlikely(cm->type == MSGTYPE_RESET_CONN)) {
1663 BUG_ON(cm->msg.reset_conn.in_pending_conn_resets == 0);
1664 rb_erase(&(cm->msg.reset_conn.rbn),
1665 &(cm->nb->pending_conn_resets_rb));
1666 cm->msg.reset_conn.in_pending_conn_resets = 0;
1667 kref_put(&(cm->ref), cor_kreffree_bug);
1670 BUG_ON(*length + cm->length < *length);
1671 if (cm->length > targetmss - *length) {
1672 BUG_ON(*length >= targetmss);
1673 BUG_ON(cm->type != MSGTYPE_CONNDATA);
1674 *length = targetmss;
1675 } else {
1676 *length += cm->length;
1679 list_add_tail(&(cm->lh), cmsgs);
1683 static __u32 cor_get_total_messages_length(struct cor_neighbor *nb, int ping,
1684 int initsession, int nbstate, int *extralength)
1686 __u32 length = nb->cmsg_pongslength;
1688 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE)) {
1689 length += nb->cmsg_otherlength;
1691 if (unlikely(nb->max_cmsg_delay_sent == 0)) {
1692 length += KP_MISC_SET_MAX_CMSG_DELAY_CMDLEN;
1693 *extralength += KP_MISC_SET_MAX_CMSG_DELAY_CMDLEN;
1696 if (ping == TIMETOSENDPING_FORCE ||
1697 (length > 0 && ping != TIMETOSENDPING_NO)) {
1698 length += KP_MISC_PING_CMDLEN;
1699 *extralength += KP_MISC_PING_CMDLEN;
1701 if (unlikely(initsession)) {
1702 length += KP_MISC_INIT_SESSION_CMDLEN;
1703 *extralength += KP_MISC_INIT_SESSION_CMDLEN;
1707 return length;
1710 static int cor_dequeue_messages(struct cor_neighbor *nb_cmsglocked, int ping,
1711 int initsession, int nbstate, __u32 targetmss,
1712 __u32 *length, struct list_head *cmsgs)
1714 __u32 extralength = 0;
1715 __u32 totallength;
1717 int cmsgqueue_nonpong_empty = (
1718 list_empty(&(nb_cmsglocked->cmsg_queue_ack_fast)) &&
1719 list_empty(&(nb_cmsglocked->cmsg_queue_ack_slow)) &&
1720 list_empty(&(nb_cmsglocked->cmsg_queue_ackconn_urgent)) &&
1721 list_empty(&(nb_cmsglocked->cmsg_queue_ackconn_lowlat)) &&
1722 list_empty(&(nb_cmsglocked->cmsg_queue_ackconn_highlat)) &&
1723 list_empty(&(nb_cmsglocked->cmsg_queue_conndata_lowlat)) &&
1724 list_empty(&(nb_cmsglocked->cmsg_queue_conndata_highlat)) &&
1725 list_empty(&(nb_cmsglocked->cmsg_queue_other)));
1727 BUG_ON(list_empty(&(nb_cmsglocked->cmsg_queue_pong)) &&
1728 nb_cmsglocked->cmsg_pongslength != 0);
1729 BUG_ON(!list_empty(&(nb_cmsglocked->cmsg_queue_pong)) &&
1730 nb_cmsglocked->cmsg_pongslength == 0);
1731 BUG_ON(cmsgqueue_nonpong_empty &&
1732 nb_cmsglocked->cmsg_otherlength != 0);
1733 BUG_ON(!cmsgqueue_nonpong_empty &&
1734 nb_cmsglocked->cmsg_otherlength == 0);
1736 totallength = cor_get_total_messages_length(nb_cmsglocked, ping,
1737 initsession, nbstate, &extralength);
1739 if (totallength == 0)
1740 return 1;
1742 if (totallength < targetmss && ping != TIMETOSENDPING_FORCE &&
1743 time_after(cor_get_cmsg_timer_timeout(nb_cmsglocked,
1744 nbstate), jiffies))
1745 return 1;
1747 *length = extralength;
1749 _cor_dequeue_messages(nb_cmsglocked, nbstate, targetmss, length, cmsgs);
1751 BUG_ON(*length == 0);
1752 BUG_ON(*length > targetmss);
1754 return 0;
1757 static struct cor_control_retrans *cor_get_next_timeouted_retrans(
1758 struct cor_neighbor *nb_retranslocked)
1760 if (list_empty(&(nb_retranslocked->retrans_fast_list)) == 0) {
1761 struct cor_control_retrans *cr = container_of(
1762 nb_retranslocked->retrans_fast_list.next,
1763 struct cor_control_retrans, timeout_list);
1764 BUG_ON(cr->nb != nb_retranslocked);
1766 if (time_before_eq(cr->timeout, jiffies)) {
1767 return cr;
1771 if (list_empty(&(nb_retranslocked->retrans_slow_list)) == 0) {
1772 struct cor_control_retrans *cr = container_of(
1773 nb_retranslocked->retrans_slow_list.next,
1774 struct cor_control_retrans, timeout_list);
1775 BUG_ON(cr->nb != nb_retranslocked);
1777 if (time_before_eq(cr->timeout, jiffies)) {
1778 return cr;
1782 return 0;
1785 static void cor_add_timeouted_retrans(struct cor_neighbor *nb)
1787 spin_lock_bh(&(nb->retrans_lock));
1789 while (1) {
1790 struct cor_control_retrans *cr =
1791 cor_get_next_timeouted_retrans(nb);
1793 if (cr == 0)
1794 break;
1796 list_del(&(cr->timeout_list));
1797 rb_erase(&(cr->rbn), &(nb->kp_retransmits_rb));
1799 cor_requeue_control_retrans(cr);
1801 kref_put(&(cr->ref), cor_kreffree_bug); /* list_del */
1802 kref_put(&(cr->ref), cor_free_control_retrans); /* rb */
1805 if (list_empty(&(nb->retrans_fast_list)) == 0 ||
1806 list_empty(&(nb->retrans_slow_list)) == 0) {
1807 if (mod_timer(&(nb->retrans_timer),
1808 cor_get_retransmit_timeout(nb)) == 0) {
1809 cor_nb_kref_get(nb, "retransmit_timer");
1813 spin_unlock_bh(&(nb->retrans_lock));
1816 static void _cor_delete_all_cmsgs(struct list_head *cmsgs)
1818 while (!list_empty(cmsgs)) {
1819 struct cor_control_msg_out *cm = container_of(cmsgs->next,
1820 struct cor_control_msg_out, lh);
1822 list_del(&(cm->lh));
1824 if (cm->type == MSGTYPE_CONNDATA) {
1825 cor_schedule_retransmit_conn(cm->msg.conn_data.cr, 0,
1827 kfree(cm->msg.conn_data.data_orig);
1830 cor_free_control_msg(cm);
1834 static void cor_delete_all_cmsgs(struct cor_neighbor *nb)
1836 while (1) {
1837 struct list_head cmsgs;
1838 __u32 length = 0;
1840 INIT_LIST_HEAD(&cmsgs);
1842 spin_lock_bh(&(nb->cmsg_lock));
1843 _cor_dequeue_messages(nb, NEIGHBOR_STATE_ACTIVE, 65536, &length,
1844 &cmsgs);
1845 spin_unlock_bh(&(nb->cmsg_lock));
1847 if (list_empty(&cmsgs))
1848 break;
1850 _cor_delete_all_cmsgs(&cmsgs);
1854 static int cor_reset_timeouted_conn(struct cor_neighbor *nb,
1855 struct cor_conn *trgt_out)
1857 struct cor_conn_bidir *cnb = cor_get_conn_bidir(trgt_out);
1858 struct cor_conn *src_in = cor_get_conn_reversedir(trgt_out);
1860 int resetted = 0;
1862 spin_lock_bh(&(cnb->cli.rcv_lock));
1863 spin_lock_bh(&(cnb->srv.rcv_lock));
1865 BUG_ON(trgt_out->targettype != TARGET_OUT);
1866 BUG_ON(trgt_out->target.out.nb != nb);
1868 if (unlikely(trgt_out->isreset != 0) || likely(time_before(jiffies,
1869 trgt_out->target.out.jiffies_last_act +
1870 CONN_ACTIVITY_UPDATEINTERVAL_SEC * HZ +
1871 CONN_INACTIVITY_TIMEOUT_SEC * HZ)))
1872 goto unlock;
1874 resetted = (cor_send_reset_conn(nb, cor_get_connid_reverse(
1875 src_in->source.in.conn_id), 1) == 0);
1876 if (unlikely(resetted == 0))
1877 goto unlock;
1879 BUG_ON(trgt_out->isreset != 0);
1880 trgt_out->isreset = 1;
1882 cor_reset_conn_locked(cnb);
1884 unlock:
1885 spin_unlock_bh(&(cnb->srv.rcv_lock));
1886 spin_unlock_bh(&(cnb->cli.rcv_lock));
1888 return resetted;
1891 static void cor_reset_timeouted_conns(struct cor_neighbor *nb)
1893 int i;
1894 for (i=0;i<10000;i++) {
1895 unsigned long iflags;
1896 struct cor_conn *trgt_out;
1898 int resetted;
1900 spin_lock_irqsave(&(nb->conn_list_lock), iflags);
1902 if (list_empty(&(nb->snd_conn_list))) {
1903 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
1904 break;
1907 trgt_out = container_of(nb->snd_conn_list.next, struct cor_conn,
1908 target.out.nb_list);
1909 cor_conn_kref_get(trgt_out, "stack");
1911 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
1913 resetted = cor_reset_timeouted_conn(nb, trgt_out);
1915 cor_conn_kref_put(trgt_out, "stack");
1917 if (likely(resetted == 0))
1918 break;
1923 * may not be called by more than one thread at the same time, because
1924 * 1) readding cor_control_msg_out may reorder them
1925 * 2) multiple pings may be sent
1927 int cor_send_messages(struct cor_neighbor *nb, unsigned long cmsg_send_start_j,
1928 ktime_t cmsg_send_start_kt, int *sent)
1930 int rc = QOS_RESUME_DONE;
1931 int ping;
1932 int initsession;
1933 __u32 targetmss = cor_mss_cmsg(nb);
1935 int nbstate = cor_get_neigh_state(nb);
1937 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE))
1938 cor_reset_timeouted_conns(nb);
1940 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
1941 spin_lock_bh(&(nb->retrans_lock));
1942 cor_empty_retrans_queue(nb);
1943 spin_unlock_bh(&(nb->retrans_lock));
1945 cor_delete_all_cmsgs(nb);
1946 return QOS_RESUME_DONE;
1949 ping = cor_time_to_send_ping(nb);
1951 spin_lock_bh(&(nb->cmsg_lock));
1953 if (nb->add_retrans_needed != 0) {
1954 nb->add_retrans_needed = 0;
1955 spin_unlock_bh(&(nb->cmsg_lock));
1956 cor_add_timeouted_retrans(nb);
1957 spin_lock_bh(&(nb->cmsg_lock));
1960 initsession = unlikely(atomic_read(&(nb->sessionid_snd_needed)) != 0);
1962 while (1) {
1963 struct list_head cmsgs;
1964 __u32 length = 0;
1965 __u64 seqno;
1967 INIT_LIST_HEAD(&cmsgs);
1969 if (cor_dequeue_messages(nb, ping, initsession, nbstate,
1970 targetmss, &length, &cmsgs) != 0) {
1971 cor_schedule_controlmsg_timer(nb);
1972 spin_unlock_bh(&(nb->cmsg_lock));
1973 return QOS_RESUME_DONE;
1976 nb->kpacket_seqno++;
1977 seqno = nb->kpacket_seqno;
1979 spin_unlock_bh(&(nb->cmsg_lock));
1981 rc = _cor_send_messages_send(nb, ping, initsession, &cmsgs,
1982 nbstate, length, seqno, cmsg_send_start_j,
1983 cmsg_send_start_kt, sent);
1985 if (rc != QOS_RESUME_DONE)
1986 return rc;
1988 ping = 0;
1989 initsession = 0;
1991 spin_lock_bh(&(nb->cmsg_lock));
1995 static unsigned long cor_calc_cmsg_send_start_j(unsigned long cmsg_timer_timeout)
1997 unsigned long jiffies_tmp = jiffies;
1998 if (unlikely(time_after(cmsg_timer_timeout, jiffies_tmp)))
1999 return jiffies_tmp;
2000 else
2001 return cmsg_timer_timeout;
2004 static ktime_t cor_calc_cmsg_send_start_kt(unsigned long cmsg_timer_timeout)
2006 ktime_t now = ktime_get();
2007 unsigned long jiffies_tmp = jiffies;
2009 unsigned long jiffies_delayed;
2010 if (unlikely(time_after(cmsg_timer_timeout, jiffies_tmp))) {
2011 jiffies_delayed = 0;
2012 } else {
2013 jiffies_delayed = jiffies_tmp - cmsg_timer_timeout;
2014 if (unlikely(jiffies_delayed > HZ/10)) {
2015 jiffies_delayed = HZ/10;
2019 return ns_to_ktime(ktime_to_ns(now) -
2020 1000LL * jiffies_to_usecs(jiffies_delayed));
2024 void cor_controlmsg_timerfunc(struct timer_list *cmsg_timer)
2026 struct cor_neighbor *nb = container_of(cmsg_timer,
2027 struct cor_neighbor, cmsg_timer);
2028 unsigned long cmsg_timer_timeout = (unsigned long)
2029 atomic64_read(&(nb->cmsg_timer_timeout));
2030 unsigned long cmsg_send_start_j = cor_calc_cmsg_send_start_j(
2031 cmsg_timer_timeout);
2032 ktime_t cmsg_send_start_kt = cor_calc_cmsg_send_start_kt(
2033 cmsg_timer_timeout);
2034 cor_qos_enqueue(nb->queue, &(nb->rb_kp), cmsg_send_start_j,
2035 cmsg_send_start_kt, QOS_CALLER_KPACKET, 0);
2036 cor_nb_kref_put(nb, "controlmsg_timer");
2039 static int cor_cmsg_full_packet(struct cor_neighbor *nb, int nbstate)
2041 __u32 extralength = 0;
2042 int ping = cor_time_to_send_ping(nb);
2043 int initsession = unlikely(atomic_read(&(nb->sessionid_snd_needed)) !=
2045 __u32 len = cor_get_total_messages_length(nb, ping, initsession,
2046 nbstate, &extralength);
2048 if (len == 0)
2049 return 0;
2050 if (len < cor_mss_cmsg(nb))
2051 return 0;
2053 return 1;
2056 void cor_schedule_controlmsg_timer(struct cor_neighbor *nb_cmsglocked)
2058 unsigned long timeout;
2059 int nbstate = cor_get_neigh_state(nb_cmsglocked);
2061 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED))
2062 goto now;
2064 if (atomic_read(&(nb_cmsglocked->cmsg_bulk_readds)) != 0)
2065 return;
2067 if (cor_cmsg_full_packet(nb_cmsglocked, nbstate))
2068 goto now;
2070 if (nb_cmsglocked->add_retrans_needed != 0)
2071 goto now;
2073 timeout = cor_get_cmsg_timer_timeout(nb_cmsglocked, nbstate);
2075 if (0) {
2076 now:
2077 cor_qos_enqueue(nb_cmsglocked->queue, &(nb_cmsglocked->rb_kp),
2078 jiffies, ktime_get(), QOS_CALLER_KPACKET, 0);
2079 } else if (time_before_eq(timeout, jiffies)) {
2080 unsigned long cmsg_send_start_j = cor_calc_cmsg_send_start_j(
2081 timeout);
2082 ktime_t cmsg_send_start_kt = cor_calc_cmsg_send_start_kt(
2083 timeout);
2084 cor_qos_enqueue(nb_cmsglocked->queue, &(nb_cmsglocked->rb_kp),
2085 cmsg_send_start_j, cmsg_send_start_kt,
2086 QOS_CALLER_KPACKET, 0);
2087 } else {
2088 atomic64_set(&(nb_cmsglocked->cmsg_timer_timeout), timeout);
2089 barrier();
2090 if (mod_timer(&(nb_cmsglocked->cmsg_timer), timeout) == 0) {
2091 cor_nb_kref_get(nb_cmsglocked, "controlmsg_timer");
2096 static int cor_insert_pending_conn_resets(struct cor_control_msg_out *ins)
2098 struct cor_neighbor *nb = ins->nb;
2099 __u32 conn_id = ins->msg.reset_conn.conn_id;
2101 struct rb_root *root;
2102 struct rb_node **p;
2103 struct rb_node *parent = 0;
2105 BUG_ON(nb == 0);
2106 BUG_ON(ins->msg.reset_conn.in_pending_conn_resets != 0);
2108 root = &(nb->pending_conn_resets_rb);
2109 p = &(root->rb_node);
2111 while ((*p) != 0) {
2112 struct cor_control_msg_out *cm = container_of(*p,
2113 struct cor_control_msg_out,
2114 msg.reset_conn.rbn);
2115 __u32 cm_connid = cm->msg.reset_conn.conn_id;
2117 BUG_ON(cm->nb != ins->nb);
2118 BUG_ON(cm->type != MSGTYPE_RESET_CONN);
2120 parent = *p;
2121 if (conn_id == cm_connid) {
2122 return 1;
2123 } else if (conn_id < cm_connid) {
2124 p = &(*p)->rb_left;
2125 } else if (conn_id > cm_connid) {
2126 p = &(*p)->rb_right;
2127 } else {
2128 BUG();
2132 kref_get(&(ins->ref));
2133 rb_link_node(&(ins->msg.reset_conn.rbn), parent, p);
2134 rb_insert_color(&(ins->msg.reset_conn.rbn), root);
2135 ins->msg.reset_conn.in_pending_conn_resets = 1;
2137 return 0;
2140 static void cor_free_oldest_pong(struct cor_neighbor *nb)
2142 struct cor_control_msg_out *cm = container_of(nb->cmsg_queue_pong.next,
2143 struct cor_control_msg_out, lh);
2145 BUG_ON(list_empty(&(nb->cmsg_queue_pong)));
2146 BUG_ON(unlikely(cm->type != MSGTYPE_PONG));
2148 list_del(&(cm->lh));
2149 nb->cmsg_pongslength -= cm->length;
2150 BUG_ON(nb->cmsg_pongscnt == 0);
2151 cm->nb->cmsg_pongscnt--;
2152 cor_free_control_msg(cm);
2155 static struct list_head * _cor_enqueue_control_msg_getqueue(
2156 struct cor_control_msg_out *cm)
2158 if (cm->type == MSGTYPE_ACK) {
2159 if (cm->msg.ack.fast != 0) {
2160 return &(cm->nb->cmsg_queue_ack_fast);
2161 } else {
2162 return &(cm->nb->cmsg_queue_ack_slow);
2164 } else if (cm->type == MSGTYPE_ACK_CONN) {
2165 if (unlikely(cm->msg.ack_conn.queue == CMSGQUEUE_ACK_CONN_URGENT)) {
2166 return &(cm->nb->cmsg_queue_ackconn_urgent);
2167 } else if (cm->msg.ack_conn.queue == CMSGQUEUE_ACK_CONN_LOWLAT) {
2168 return &(cm->nb->cmsg_queue_ackconn_lowlat);
2169 } else if (cm->msg.ack_conn.queue == CMSGQUEUE_ACK_CONN_HIGHLAT) {
2170 return &(cm->nb->cmsg_queue_ackconn_highlat);
2171 } else {
2172 BUG();
2174 } else if (cm->type == MSGTYPE_CONNDATA) {
2175 if (cm->msg.conn_data.highlatency != 0) {
2176 return &(cm->nb->cmsg_queue_conndata_highlat);
2177 } else {
2178 return &(cm->nb->cmsg_queue_conndata_lowlat);
2180 } else {
2181 return &(cm->nb->cmsg_queue_other);
2185 static int _cor_enqueue_control_msg(struct cor_control_msg_out *cm, int src)
2187 if (unlikely(cm->type == MSGTYPE_PONG)) {
2188 BUG_ON(src == ADDCMSG_SRC_SPLITCONNDATA);
2190 if (cm->nb->cmsg_pongscnt >= MAX_PONG_CMSGS_PER_NEIGH) {
2191 if (src != ADDCMSG_SRC_NEW) {
2192 BUG_ON(cm->nb->cmsg_pongscnt == 0);
2193 cm->nb->cmsg_pongscnt--;
2194 cor_free_control_msg(cm);
2195 return 1;
2196 } else {
2197 cor_free_oldest_pong(cm->nb);
2201 cm->nb->cmsg_pongscnt++;
2202 cm->nb->cmsg_pongslength += cm->length;
2204 if (src != ADDCMSG_SRC_NEW) {
2205 list_add(&(cm->lh), &(cm->nb->cmsg_queue_pong));
2206 } else {
2207 list_add_tail(&(cm->lh), &(cm->nb->cmsg_queue_pong));
2210 return 0;
2211 } else if (unlikely(cm->type == MSGTYPE_RESET_CONN)) {
2212 if (cor_insert_pending_conn_resets(cm) != 0) {
2213 cm->type = 0;
2214 cor_free_control_msg(cm);
2215 return 1;
2219 cm->nb->cmsg_otherlength += cm->length;
2220 if (src == ADDCMSG_SRC_NEW) {
2221 list_add_tail(&(cm->lh), _cor_enqueue_control_msg_getqueue(cm));
2222 } else {
2223 BUG_ON(src == ADDCMSG_SRC_SPLITCONNDATA &&
2224 cm->type != MSGTYPE_CONNDATA);
2225 BUG_ON(src == ADDCMSG_SRC_READD &&
2226 cm->type == MSGTYPE_ACK_CONN);
2228 list_add(&(cm->lh), _cor_enqueue_control_msg_getqueue(cm));
2231 return 0;
2234 static void cor_enqueue_control_msg(struct cor_control_msg_out *cm, int src)
2236 struct cor_neighbor *nb;
2238 BUG_ON(cm == 0);
2239 nb = cm->nb;
2240 BUG_ON(nb == 0);
2243 if (src == ADDCMSG_SRC_NEW)
2244 cm->time_added = jiffies;
2246 spin_lock_bh(&(nb->cmsg_lock));
2248 if (_cor_enqueue_control_msg(cm, src) != 0)
2249 goto out;
2251 if (src != ADDCMSG_SRC_READD && src != ADDCMSG_SRC_RETRANS)
2252 cor_schedule_controlmsg_timer(nb);
2254 out:
2255 spin_unlock_bh(&(nb->cmsg_lock));
2259 void cor_send_pong(struct cor_neighbor *nb, __u32 cookie, ktime_t ping_rcvtime)
2261 struct cor_control_msg_out *cm = _cor_alloc_control_msg(nb);
2263 if (unlikely(cm == 0))
2264 return;
2266 cm->nb = nb;
2267 cm->type = MSGTYPE_PONG;
2268 cm->msg.pong.cookie = cookie;
2269 cm->msg.pong.type = MSGTYPE_PONG_TIMEENQUEUED;
2270 cm->msg.pong.ping_rcvtime = ping_rcvtime;
2271 cm->msg.pong.time_enqueued = ktime_get();
2272 cm->length = 13;
2273 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2276 void cor_send_ack(struct cor_neighbor *nb, __u64 seqno, __u8 fast)
2278 struct cor_control_msg_out *cm = cor_alloc_control_msg(nb,
2279 ACM_PRIORITY_HIGH);
2281 if (unlikely(cm == 0))
2282 return;
2284 cm->nb = nb;
2285 cm->type = MSGTYPE_ACK;
2286 cm->msg.ack.seqno = seqno;
2287 cm->msg.ack.fast = fast;
2288 cm->length = 7;
2289 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2292 static __u8 get_queue_for_ackconn(struct cor_conn *src_in_lx)
2294 if (src_in_lx->is_highlatency != 0) {
2295 if (unlikely(cor_ackconn_urgent(src_in_lx))) {
2296 return CMSGQUEUE_ACK_CONN_LOWLAT;
2297 } else {
2298 return CMSGQUEUE_ACK_CONN_HIGHLAT;
2300 } else {
2301 if (unlikely(cor_ackconn_urgent(src_in_lx))) {
2302 return CMSGQUEUE_ACK_CONN_URGENT;
2303 } else {
2304 return CMSGQUEUE_ACK_CONN_LOWLAT;
2309 static void cor_set_ooolen_flags(struct cor_control_msg_out *cm)
2311 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
2312 (~KP_ACK_CONN_FLAGS_OOO));
2313 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
2314 cor_ooolen_to_flags(cm->msg.ack_conn.length));
2317 /* cmsg_lock must be held */
2318 static void cor_remove_pending_ackconn(struct cor_control_msg_out *cm)
2320 cm->nb->cmsg_otherlength -= cm->length;
2321 list_del(&(cm->lh));
2323 list_del(&(cm->msg.ack_conn.conn_acks));
2324 cor_conn_kref_put(cm->msg.ack_conn.src_in,
2325 "cor_control_msg_out ack_conn");
2326 cm->msg.ack_conn.src_in = 0;
2328 cm->type = 0;
2329 cor_free_control_msg(cm);
2332 /* cmsg_lock must be held */
2333 static void cor_recalc_scheduled_ackconn_size(struct cor_control_msg_out *cm)
2335 cm->nb->cmsg_otherlength -= cm->length;
2336 cm->length = 5 + cor_ack_conn_len(cm->msg.ack_conn.flags);
2337 cm->nb->cmsg_otherlength += cm->length;
2340 /* cmsg_lock must be held */
2341 static int _cor_try_merge_ackconn(struct cor_conn *src_in_l,
2342 struct cor_control_msg_out *fromcm,
2343 struct cor_control_msg_out *tocm, int from_newack)
2345 if (cor_ooolen(fromcm->msg.ack_conn.flags) != 0 &&
2346 cor_ooolen(tocm->msg.ack_conn.flags) != 0) {
2347 __u64 tocmseqno = tocm->msg.ack_conn.seqno_ooo;
2348 __u64 tocmlength = tocm->msg.ack_conn.length;
2349 __u64 fromcmseqno = fromcm->msg.ack_conn.seqno_ooo;
2350 __u64 fromcmlength = fromcm->msg.ack_conn.length;
2352 if (cor_seqno_eq(tocmseqno, fromcmseqno)) {
2353 if (fromcmlength > tocmlength)
2354 tocm->msg.ack_conn.length = fromcmlength;
2355 } else if (cor_seqno_after(fromcmseqno, tocmseqno) &&
2356 cor_seqno_before_eq(fromcmseqno, tocmseqno +
2357 tocmlength)) {
2358 __u64 len = cor_seqno_clean(fromcmseqno + fromcmlength -
2359 tocmseqno);
2360 BUG_ON(len > U32_MAX);
2361 tocm->msg.ack_conn.length = (__u32) len;
2362 } else if (cor_seqno_before(fromcmseqno, tocmseqno) &&
2363 cor_seqno_after_eq(fromcmseqno, tocmseqno)) {
2364 __u64 len = cor_seqno_clean(tocmseqno + tocmlength -
2365 fromcmseqno);
2366 BUG_ON(len > U32_MAX);
2367 tocm->msg.ack_conn.seqno_ooo = fromcmseqno;
2368 tocm->msg.ack_conn.length = (__u32) len;
2369 } else {
2370 return 1;
2372 cor_set_ooolen_flags(tocm);
2375 if ((fromcm->msg.ack_conn.flags &
2376 KP_ACK_CONN_FLAGS_SEQNO) != 0) {
2377 if ((tocm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) == 0)
2378 goto setseqno;
2380 BUG_ON(cor_seqno_eq(fromcm->msg.ack_conn.ack_seqno,
2381 tocm->msg.ack_conn.ack_seqno));
2382 if (cor_seqno_after_eq(tocm->msg.ack_conn.ack_seqno,
2383 fromcm->msg.ack_conn.ack_seqno)) {
2384 BUG_ON(cor_seqno_after(fromcm->msg.ack_conn.seqno,
2385 tocm->msg.ack_conn.seqno));
2386 goto skipseqno;
2389 BUG_ON(cor_seqno_before(fromcm->msg.ack_conn.seqno,
2390 tocm->msg.ack_conn.seqno));
2392 setseqno:
2393 tocm->msg.ack_conn.flags = (tocm->msg.ack_conn.flags |
2394 KP_ACK_CONN_FLAGS_SEQNO);
2395 tocm->msg.ack_conn.seqno = fromcm->msg.ack_conn.seqno;
2396 tocm->msg.ack_conn.ack_seqno = fromcm->msg.ack_conn.ack_seqno;
2398 skipseqno:
2399 if ((fromcm->msg.ack_conn.flags &
2400 KP_ACK_CONN_FLAGS_WINDOW) != 0)
2401 tocm->msg.ack_conn.flags = (tocm->msg.ack_conn.flags |
2402 KP_ACK_CONN_FLAGS_WINDOW);
2406 if (cor_ooolen(fromcm->msg.ack_conn.flags) != 0) {
2407 tocm->msg.ack_conn.seqno_ooo = fromcm->msg.ack_conn.seqno_ooo;
2408 tocm->msg.ack_conn.length = fromcm->msg.ack_conn.length;
2409 cor_set_ooolen_flags(tocm);
2412 if ((fromcm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) != 0) {
2413 BUG_ON((tocm->msg.ack_conn.flags &
2414 KP_ACK_CONN_FLAGS_PRIORITY) != 0);
2415 tocm->msg.ack_conn.priority_seqno =
2416 fromcm->msg.ack_conn.priority_seqno;
2417 tocm->msg.ack_conn.priority = fromcm->msg.ack_conn.priority;
2420 cor_recalc_scheduled_ackconn_size(tocm);
2421 if (from_newack == 0)
2422 cor_remove_pending_ackconn(fromcm);
2424 return 0;
2427 /* cmsg_lock must be held */
2428 static void cor_try_merge_ackconns(struct cor_conn *src_in_l,
2429 struct cor_control_msg_out *cm)
2431 struct list_head *currlh = cm->msg.ack_conn.conn_acks.next;
2433 while (currlh != &(src_in_l->source.in.acks_pending)) {
2434 struct cor_control_msg_out *currcm = container_of(currlh,
2435 struct cor_control_msg_out,
2436 msg.ack_conn.conn_acks);
2437 currlh = currlh->next;
2438 cor_remove_connack_oooflag_ifold(src_in_l, currcm);
2439 _cor_try_merge_ackconn(src_in_l, currcm, cm, 0);
2443 static void cor_merge_or_enqueue_ackconn(struct cor_conn *src_in_l,
2444 struct cor_control_msg_out *cm, int src)
2446 struct list_head *currlh;
2448 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
2450 spin_lock_bh(&(cm->nb->cmsg_lock));
2452 currlh = src_in_l->source.in.acks_pending.next;
2453 while (currlh != &(src_in_l->source.in.acks_pending)) {
2454 struct cor_control_msg_out *currcm = container_of(currlh,
2455 struct cor_control_msg_out,
2456 msg.ack_conn.conn_acks);
2458 BUG_ON(currcm->nb != cm->nb);
2459 BUG_ON(currcm->type != MSGTYPE_ACK_CONN);
2460 BUG_ON(cm->msg.ack_conn.src_in != src_in_l);
2461 BUG_ON(currcm->msg.ack_conn.conn_id !=
2462 cm->msg.ack_conn.conn_id);
2464 if (_cor_try_merge_ackconn(src_in_l, cm, currcm, 1) == 0) {
2465 cor_try_merge_ackconns(src_in_l, currcm);
2466 cor_schedule_controlmsg_timer(currcm->nb);
2467 spin_unlock_bh(&(currcm->nb->cmsg_lock));
2469 * flags:
2470 * when calling cor_free_control_msg here conn may
2471 * already be locked and priority_send_allowed and
2472 * priority_send_allowed should not be reset
2474 cm->msg.ack_conn.flags = 0;
2475 cor_free_control_msg(cm);
2476 return;
2479 currlh = currlh->next;
2482 list_add_tail(&(cm->msg.ack_conn.conn_acks),
2483 &(src_in_l->source.in.acks_pending));
2485 spin_unlock_bh(&(cm->nb->cmsg_lock));
2487 cor_enqueue_control_msg(cm, src);
2490 static int cor_try_update_ackconn_seqno(struct cor_conn *src_in_l)
2492 int rc = 1;
2494 spin_lock_bh(&(src_in_l->source.in.nb->cmsg_lock));
2496 if (list_empty(&(src_in_l->source.in.acks_pending)) == 0) {
2497 struct cor_control_msg_out *cm = container_of(
2498 src_in_l->source.in.acks_pending.next,
2499 struct cor_control_msg_out,
2500 msg.ack_conn.conn_acks);
2501 BUG_ON(cm->nb != src_in_l->source.in.nb);
2502 BUG_ON(cm->type != MSGTYPE_ACK_CONN);
2503 BUG_ON(cm->msg.ack_conn.src_in != src_in_l);
2504 BUG_ON(cm->msg.ack_conn.conn_id != cor_get_connid_reverse(
2505 src_in_l->source.in.conn_id));
2507 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
2508 KP_ACK_CONN_FLAGS_SEQNO |
2509 KP_ACK_CONN_FLAGS_WINDOW);
2510 cm->msg.ack_conn.seqno = src_in_l->source.in.next_seqno;
2512 src_in_l->source.in.ack_seqno++;
2513 cm->msg.ack_conn.ack_seqno = src_in_l->source.in.ack_seqno;
2515 cor_remove_connack_oooflag_ifold(src_in_l, cm);
2516 cor_recalc_scheduled_ackconn_size(cm);
2518 cor_try_merge_ackconns(src_in_l, cm);
2520 rc = 0;
2523 spin_unlock_bh(&(src_in_l->source.in.nb->cmsg_lock));
2525 return rc;
2528 void cor_send_ack_conn_ifneeded(struct cor_conn *src_in_l, __u64 seqno_ooo,
2529 __u32 ooo_length)
2531 struct cor_control_msg_out *cm;
2533 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
2535 BUG_ON(ooo_length > 0 && cor_seqno_before_eq(seqno_ooo,
2536 src_in_l->source.in.next_seqno));
2538 cor_update_windowlimit(src_in_l);
2540 if (ooo_length != 0) {
2541 cm = cor_alloc_control_msg(src_in_l->source.in.nb,
2542 ACM_PRIORITY_LOW);
2543 if (cm != 0)
2544 goto add;
2547 if (src_in_l->source.in.inorder_ack_needed != 0)
2548 goto ack_needed;
2550 if (cor_seqno_clean(src_in_l->source.in.window_seqnolimit -
2551 src_in_l->source.in.next_seqno) < WINDOW_ENCODE_MIN)
2552 return;
2554 if (cor_seqno_clean(src_in_l->source.in.window_seqnolimit_remote -
2555 src_in_l->source.in.next_seqno) >= WINDOW_ENCODE_MIN &&
2556 cor_seqno_clean(src_in_l->source.in.window_seqnolimit -
2557 src_in_l->source.in.next_seqno) * 7 <
2558 cor_seqno_clean(
2559 src_in_l->source.in.window_seqnolimit_remote -
2560 src_in_l->source.in.next_seqno) * 8)
2561 return;
2563 ack_needed:
2564 if (cor_try_update_ackconn_seqno(src_in_l) == 0)
2565 goto out;
2567 cm = cor_alloc_control_msg(src_in_l->source.in.nb, ACM_PRIORITY_MED);
2568 if (cm == 0) {
2569 printk(KERN_ERR "error allocating inorder ack\n");
2570 return;
2573 add:
2574 cm->type = MSGTYPE_ACK_CONN;
2575 src_in_l->source.in.ack_seqno++;
2576 cm->msg.ack_conn.ack_seqno = src_in_l->source.in.ack_seqno;
2577 cor_conn_kref_get(src_in_l, "cor_control_msg_out ack_conn");
2578 cm->msg.ack_conn.src_in = src_in_l;
2579 cm->msg.ack_conn.conn_id =
2580 cor_get_connid_reverse(src_in_l->source.in.conn_id);
2581 cm->msg.ack_conn.seqno = src_in_l->source.in.next_seqno;
2582 cm->msg.ack_conn.seqno_ooo = seqno_ooo;
2583 cm->msg.ack_conn.length = ooo_length;
2584 cm->msg.ack_conn.bufsize_changerate =
2585 _cor_bufsize_update_get_changerate(src_in_l);
2586 cm->msg.ack_conn.flags = KP_ACK_CONN_FLAGS_SEQNO |
2587 KP_ACK_CONN_FLAGS_WINDOW;
2588 cor_set_ooolen_flags(cm);
2589 cm->msg.ack_conn.is_highlatency = src_in_l->is_highlatency;
2590 cm->msg.ack_conn.queue = get_queue_for_ackconn(src_in_l);
2591 cm->length = 5 + cor_ack_conn_len(cm->msg.ack_conn.flags);
2593 cor_merge_or_enqueue_ackconn(src_in_l, cm, ADDCMSG_SRC_NEW);
2595 out:
2596 src_in_l->source.in.inorder_ack_needed = 0;
2597 src_in_l->source.in.window_seqnolimit_remote =
2598 src_in_l->source.in.window_seqnolimit;
2601 static int cor_try_add_priority(struct cor_conn *trgt_out_ll, __u16 priority)
2603 int rc = 1;
2604 struct cor_conn *src_in_ll = cor_get_conn_reversedir(trgt_out_ll);
2606 spin_lock_bh(&(trgt_out_ll->target.out.nb->cmsg_lock));
2608 if (list_empty(&(src_in_ll->source.in.acks_pending)) == 0) {
2609 struct cor_control_msg_out *cm = container_of(
2610 src_in_ll->source.in.acks_pending.next,
2611 struct cor_control_msg_out,
2612 msg.ack_conn.conn_acks);
2613 BUG_ON(cm->nb != trgt_out_ll->target.out.nb);
2614 BUG_ON(cm->type != MSGTYPE_ACK_CONN);
2615 BUG_ON(cm->msg.ack_conn.src_in != src_in_ll);
2616 BUG_ON(cm->msg.ack_conn.conn_id !=
2617 trgt_out_ll->target.out.conn_id);
2619 BUG_ON((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) !=
2621 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
2622 KP_ACK_CONN_FLAGS_PRIORITY);
2623 cm->msg.ack_conn.priority_seqno =
2624 trgt_out_ll->target.out.priority_seqno;
2625 cm->msg.ack_conn.priority = priority;
2626 cor_recalc_scheduled_ackconn_size(cm);
2628 rc = 0;
2631 spin_unlock_bh(&(trgt_out_ll->target.out.nb->cmsg_lock));
2633 return rc;
2636 void cor_send_priority(struct cor_conn *trgt_out_ll, __u16 priority)
2638 struct cor_conn *src_in_ll = cor_get_conn_reversedir(trgt_out_ll);
2639 struct cor_control_msg_out *cm;
2641 if (cor_try_add_priority(trgt_out_ll, priority) == 0)
2642 goto out;
2644 cm = cor_alloc_control_msg(trgt_out_ll->target.out.nb,
2645 ACM_PRIORITY_LOW);
2646 if (cm == 0)
2647 return;
2649 cm->type = MSGTYPE_ACK_CONN;
2650 cm->msg.ack_conn.flags = KP_ACK_CONN_FLAGS_PRIORITY;
2651 cor_conn_kref_get(src_in_ll, "cor_control_msg_out ack_conn");
2652 BUG_ON(trgt_out_ll->targettype != TARGET_OUT);
2653 cm->msg.ack_conn.src_in = src_in_ll;
2654 cm->msg.ack_conn.conn_id = trgt_out_ll->target.out.conn_id;
2655 cm->msg.ack_conn.bufsize_changerate =
2656 _cor_bufsize_update_get_changerate(src_in_ll);
2657 cm->msg.ack_conn.priority_seqno =
2658 trgt_out_ll->target.out.priority_seqno;
2659 cm->msg.ack_conn.priority = priority;
2660 cm->msg.ack_conn.is_highlatency = trgt_out_ll->is_highlatency;
2661 cm->msg.ack_conn.queue = get_queue_for_ackconn(src_in_ll);
2663 cm->length = 5 + cor_ack_conn_len(cm->msg.ack_conn.flags);
2664 cor_merge_or_enqueue_ackconn(src_in_ll, cm, ADDCMSG_SRC_NEW);
2666 out:
2667 trgt_out_ll->target.out.priority_last = priority;
2668 trgt_out_ll->target.out.priority_seqno =
2669 (trgt_out_ll->target.out.priority_seqno + 1) & 15;
2670 trgt_out_ll->target.out.priority_send_allowed = 0;
2673 void cor_free_ack_conns(struct cor_conn *src_in_lx)
2675 int changed = 0;
2676 spin_lock_bh(&(src_in_lx->source.in.nb->cmsg_lock));
2677 while (list_empty(&(src_in_lx->source.in.acks_pending)) == 0) {
2678 struct list_head *currlh =
2679 src_in_lx->source.in.acks_pending.next;
2680 struct cor_control_msg_out *currcm = container_of(currlh,
2681 struct cor_control_msg_out,
2682 msg.ack_conn.conn_acks);
2684 cor_remove_pending_ackconn(currcm);
2685 changed = 1;
2687 if (changed)
2688 cor_schedule_controlmsg_timer(src_in_lx->source.in.nb);
2689 spin_unlock_bh(&(src_in_lx->source.in.nb->cmsg_lock));
2692 void cor_send_connect_success(struct cor_control_msg_out *cm, __u32 conn_id,
2693 struct cor_conn *src_in)
2695 cm->type = MSGTYPE_CONNECT_SUCCESS;
2696 cm->msg.connect_success.conn_id = conn_id;
2697 cor_conn_kref_get(src_in, "cor_control_msg_out connect_success");
2698 cm->msg.connect_success.src_in = src_in;
2699 cm->length = 7;
2700 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2703 void cor_send_connect_nb(struct cor_control_msg_out *cm, __u32 conn_id,
2704 __u64 seqno1, __u64 seqno2, struct cor_conn *src_in_ll)
2706 cm->type = MSGTYPE_CONNECT;
2707 cm->msg.connect.conn_id = conn_id;
2708 cm->msg.connect.seqno1 = seqno1;
2709 cm->msg.connect.seqno2 = seqno2;
2710 cor_conn_kref_get(src_in_ll, "cor_control_msg_out connect");
2711 BUG_ON(src_in_ll->sourcetype != SOURCE_IN);
2712 cm->msg.connect.src_in = src_in_ll;
2713 cm->length = 22;
2714 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2717 void cor_send_conndata(struct cor_control_msg_out *cm, __u32 conn_id,
2718 __u64 seqno, char *data_orig, char *data, __u32 datalen,
2719 __u8 windowused, __u8 flush, __u8 highlatency,
2720 struct cor_conn_retrans *cr)
2722 cm->type = MSGTYPE_CONNDATA;
2723 cm->msg.conn_data.conn_id = conn_id;
2724 cm->msg.conn_data.seqno = seqno;
2725 cm->msg.conn_data.data_orig = data_orig;
2726 cm->msg.conn_data.data = data;
2727 cm->msg.conn_data.datalen = datalen;
2728 cm->msg.conn_data.windowused = windowused;
2729 cm->msg.conn_data.flush = flush;
2730 cm->msg.conn_data.highlatency = highlatency;
2731 cm->msg.conn_data.cr = cr;
2732 kref_get(&(cr->ref));
2733 cm->length = get_kp_conn_data_length(datalen);
2734 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2737 int cor_send_reset_conn(struct cor_neighbor *nb, __u32 conn_id, int lowprio)
2739 struct cor_control_msg_out *cm;
2741 if (unlikely(cor_get_neigh_state(nb) == NEIGHBOR_STATE_KILLED))
2742 return 0;
2744 cm = cor_alloc_control_msg(nb, lowprio ?
2745 ACM_PRIORITY_LOW : ACM_PRIORITY_MED);
2747 if (unlikely(cm == 0))
2748 return 1;
2750 cm->type = MSGTYPE_RESET_CONN;
2751 cm->msg.reset_conn.conn_id = conn_id;
2752 cm->length = 5;
2754 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2756 return 0;
2759 int __init cor_kgen_init(void)
2761 cor_controlmsg_slab = kmem_cache_create("cor_controlmsg",
2762 sizeof(struct cor_control_msg_out), 8, 0, 0);
2763 if (unlikely(cor_controlmsg_slab == 0))
2764 return -ENOMEM;
2766 cor_controlretrans_slab = kmem_cache_create("cor_controlretransmsg",
2767 sizeof(struct cor_control_retrans), 8, 0, 0);
2768 if (unlikely(cor_controlretrans_slab == 0))
2769 return -ENOMEM;
2771 return 0;
2774 void __exit cor_kgen_exit2(void)
2776 kmem_cache_destroy(cor_controlretrans_slab);
2777 cor_controlretrans_slab = 0;
2779 kmem_cache_destroy(cor_controlmsg_slab);
2780 cor_controlmsg_slab = 0;
2783 MODULE_LICENSE("GPL");