remove nbstalled_list
[cor.git] / net / cor / neigh_snd.c
blobfe8ccc6b2541c32814377fa4cf75208676e96400
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <asm/byteorder.h>
23 #include "cor.h"
25 /* not sent over the network - internal meaning only */
26 #define MSGTYPE_PONG 1
27 #define MSGTYPE_ACK 2
28 #define MSGTYPE_ACK_CONN 3
29 #define MSGTYPE_CONNECT 4
30 #define MSGTYPE_CONNECT_SUCCESS 5
31 #define MSGTYPE_RESET_CONN 6
32 #define MSGTYPE_CONNDATA 7
33 #define MSGTYPE_SET_MAX_CMSG_DELAY 8
35 #define MSGTYPE_PONG_TIMEENQUEUED 1
36 #define MSGTYPE_PONG_RESPDELAY 2
38 struct cor_control_msg_out{
39 __u8 type;
40 __u32 length;
41 struct kref ref;
42 struct cor_neighbor *nb;
44 /* either queue or control_retrans_packet */
45 struct list_head lh;
47 unsigned long time_added;
49 union{
50 struct{
51 __u32 cookie;
52 __u8 type;
54 ktime_t ping_rcvtime;
55 ktime_t time_enqueued;
56 }pong;
58 struct{
59 __u64 seqno;
60 __u8 fast;
61 }ack;
63 struct{
64 struct cor_conn *src_in;
65 struct list_head conn_acks;
66 __u32 conn_id;
67 __u64 seqno;
68 __u64 seqno_ooo;
69 __u32 length;
71 __u8 flags;
73 __u8 bufsize_changerate;
75 __u16 priority;
76 __u8 priority_seqno;
78 __u8 is_highlatency;
79 __u8 queue;
81 __u32 ack_seqno;
82 }ack_conn;
84 struct{
85 __u32 conn_id;
86 __u64 seqno1;
87 __u64 seqno2;
88 struct cor_conn *src_in;
89 }connect;
91 struct{
92 __u32 conn_id;
93 struct cor_conn *src_in;
94 }connect_success;
96 struct{
97 struct rb_node rbn;
98 __u8 in_pending_conn_resets;
99 __u32 conn_id;
100 }reset_conn;
102 struct{
103 __u32 conn_id;
104 __u64 seqno;
105 __u32 datalen;
106 __u8 windowused;
107 __u8 flush;
108 __u8 highlatency;
109 char *data_orig;
110 char *data;
111 struct cor_conn_retrans *cr;
112 }conn_data;
114 struct{
115 __u32 ack_fast_delay;
116 __u32 ack_slow_delay;
117 __u32 ackconn_lowlatency_delay;
118 __u32 ackconn_highlatency_delay;
119 __u32 pong_delay;
120 }set_max_cmsg_delay;
121 }msg;
124 struct cor_control_retrans {
125 struct kref ref;
127 struct cor_neighbor *nb;
128 __u64 seqno;
130 unsigned long timeout;
132 struct list_head msgs;
134 struct rb_node rbn;
135 struct list_head timeout_list;
139 static struct kmem_cache *cor_controlmsg_slab;
140 static struct kmem_cache *cor_controlretrans_slab;
142 static atomic_t cor_cmsg_othercnt = ATOMIC_INIT(0);
144 #define ADDCMSG_SRC_NEW 1
145 #define ADDCMSG_SRC_SPLITCONNDATA 2
146 #define ADDCMSG_SRC_READD 3
147 #define ADDCMSG_SRC_RETRANS 4
149 static void cor_enqueue_control_msg(struct cor_control_msg_out *msg, int src);
151 static void cor_try_merge_ackconns(struct cor_conn *src_in_l,
152 struct cor_control_msg_out *cm);
154 static void cor_merge_or_enqueue_ackconn(struct cor_conn *src_in_l,
155 struct cor_control_msg_out *cm, int src);
157 static struct cor_control_msg_out *_cor_alloc_control_msg(
158 struct cor_neighbor *nb)
160 struct cor_control_msg_out *cm;
162 BUG_ON(nb == 0);
164 cm = kmem_cache_alloc(cor_controlmsg_slab, GFP_ATOMIC);
165 if (unlikely(cm == 0))
166 return 0;
167 memset(cm, 0, sizeof(struct cor_control_msg_out));
168 kref_init(&(cm->ref));
169 cm->nb = nb;
170 return cm;
173 static int cor_calc_limit(int limit, int priority)
175 if (priority == ACM_PRIORITY_LOW)
176 return (limit+1)/2;
177 else if (priority == ACM_PRIORITY_MED)
178 return (limit * 3 + 1)/4;
179 else if (priority == ACM_PRIORITY_HIGH)
180 return limit;
181 else
182 BUG();
185 struct cor_control_msg_out *cor_alloc_control_msg(struct cor_neighbor *nb,
186 int priority)
188 struct cor_control_msg_out *cm = 0;
190 long packets1;
191 long packets2;
193 BUG_ON(nb == 0);
195 packets1 = atomic_inc_return(&(nb->cmsg_othercnt));
196 packets2 = atomic_inc_return(&(cor_cmsg_othercnt));
198 BUG_ON(packets1 <= 0);
199 BUG_ON(packets2 <= 0);
201 if (packets1 <= cor_calc_limit(GUARANTEED_CMSGS_PER_NEIGH, priority))
202 goto alloc;
204 if (unlikely(unlikely(packets1 > cor_calc_limit(MAX_CMSGS_PER_NEIGH,
205 priority)) ||
206 unlikely(packets2 > cor_calc_limit(MAX_CMSGS,
207 priority))))
208 goto full;
210 alloc:
211 cm = _cor_alloc_control_msg(nb);
212 if (unlikely(cm == 0)) {
213 full:
215 /* printk(KERN_ERR "cor_alloc_control_msg failed %ld %ld", packets1, packets2); */
216 atomic_dec(&(nb->cmsg_othercnt));
217 atomic_dec(&(cor_cmsg_othercnt));
219 return cm;
222 static void cor_cmsg_kref_free(struct kref *ref)
224 struct cor_control_msg_out *cm = container_of(ref,
225 struct cor_control_msg_out, ref);
226 kmem_cache_free(cor_controlmsg_slab, cm);
229 void cor_free_control_msg(struct cor_control_msg_out *cm)
231 if (likely(cm->type != MSGTYPE_PONG)) {
232 atomic_dec(&(cm->nb->cmsg_othercnt));
233 atomic_dec(&(cor_cmsg_othercnt));
236 if (cm->type == MSGTYPE_ACK_CONN) {
237 BUG_ON(cm->msg.ack_conn.src_in == 0);
238 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) != 0){
239 struct cor_conn *trgt_out = cor_get_conn_reversedir(
240 cm->msg.ack_conn.src_in);
241 spin_lock_bh(&(trgt_out->rcv_lock));
242 BUG_ON(trgt_out->targettype != TARGET_OUT);
243 if (trgt_out->target.out.priority_send_allowed != 0) {
244 trgt_out->target.out.priority_send_allowed = 1;
245 spin_unlock_bh(&(trgt_out->rcv_lock));
246 cor_conn_refresh_priority(trgt_out, 0);
247 } else {
248 spin_unlock_bh(&(trgt_out->rcv_lock));
251 cor_conn_kref_put(cm->msg.ack_conn.src_in,
252 "cor_control_msg_out ack_conn");
253 cm->msg.ack_conn.src_in = 0;
254 } else if (cm->type == MSGTYPE_CONNECT) {
255 BUG_ON(cm->msg.connect.src_in == 0);
256 cor_conn_kref_put(cm->msg.connect.src_in,
257 "cor_control_msg_out connect");
258 cm->msg.connect.src_in = 0;
259 } else if (cm->type == MSGTYPE_CONNECT_SUCCESS) {
260 BUG_ON(cm->msg.connect_success.src_in == 0);
261 cor_conn_kref_put(cm->msg.connect_success.src_in,
262 "cor_control_msg_out connect_success");
263 cm->msg.connect_success.src_in = 0;
264 } else if (cm->type == MSGTYPE_RESET_CONN) {
265 spin_lock_bh(&(cm->nb->cmsg_lock));
266 if (cm->msg.reset_conn.in_pending_conn_resets != 0) {
267 rb_erase(&(cm->msg.reset_conn.rbn),
268 &(cm->nb->pending_conn_resets_rb));
269 cm->msg.reset_conn.in_pending_conn_resets = 0;
271 kref_put(&(cm->ref), cor_kreffree_bug);
273 spin_unlock_bh(&(cm->nb->cmsg_lock));
276 kref_put(&(cm->ref), cor_cmsg_kref_free);
279 static void cor_free_control_retrans(struct kref *ref)
281 struct cor_control_retrans *cr = container_of(ref,
282 struct cor_control_retrans, ref);
284 while (list_empty(&(cr->msgs)) == 0) {
285 struct cor_control_msg_out *cm = container_of(cr->msgs.next,
286 struct cor_control_msg_out, lh);
288 if (cm->type == MSGTYPE_PONG)
289 atomic_dec(&(cm->nb->cmsg_pongs_retrans_cnt));
291 list_del(&(cm->lh));
292 cor_free_control_msg(cm);
295 kmem_cache_free(cor_controlretrans_slab, cr);
298 struct cor_control_retrans *cor_get_control_retrans(
299 struct cor_neighbor *nb_retranslocked, __u64 seqno)
301 struct rb_node *n = 0;
302 struct cor_control_retrans *ret = 0;
304 n = nb_retranslocked->kp_retransmits_rb.rb_node;
306 while (likely(n != 0) && ret == 0) {
307 struct cor_control_retrans *cr = container_of(n,
308 struct cor_control_retrans, rbn);
310 BUG_ON(cr->nb != nb_retranslocked);
312 if (cor_seqno_before(seqno, cr->seqno))
313 n = n->rb_left;
314 else if (cor_seqno_after(seqno, cr->seqno))
315 n = n->rb_right;
316 else
317 ret = cr;
320 if (ret != 0)
321 kref_get(&(ret->ref));
323 return ret;
326 /* nb->retrans_lock must be held */
327 void cor_insert_control_retrans(struct cor_control_retrans *ins)
329 struct cor_neighbor *nb = ins->nb;
330 __u64 seqno = ins->seqno;
332 struct rb_root *root;
333 struct rb_node **p;
334 struct rb_node *parent = 0;
336 BUG_ON(nb == 0);
338 root = &(nb->kp_retransmits_rb);
339 p = &(root->rb_node);
341 while ((*p) != 0) {
342 struct cor_control_retrans *cr = container_of(*p,
343 struct cor_control_retrans, rbn);
345 BUG_ON(cr->nb != nb);
347 parent = *p;
348 if (unlikely(cor_seqno_eq(seqno, cr->seqno))) {
349 BUG();
350 } else if (cor_seqno_before(seqno, cr->seqno)) {
351 p = &(*p)->rb_left;
352 } else if (cor_seqno_after(seqno, cr->seqno)) {
353 p = &(*p)->rb_right;
354 } else {
355 BUG();
359 kref_get(&(ins->ref));
360 rb_link_node(&(ins->rbn), parent, p);
361 rb_insert_color(&(ins->rbn), root);
364 static void cor_remove_connack_oooflag_ifold(struct cor_conn *src_in_l,
365 struct cor_control_msg_out *cm)
367 if (cor_ooolen(cm->msg.ack_conn.flags) != 0 && cor_seqno_before_eq(
368 cm->msg.ack_conn.seqno_ooo +
369 cm->msg.ack_conn.length,
370 src_in_l->source.in.next_seqno)) {
371 cm->msg.ack_conn.length = 0;
372 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
373 (~KP_ACK_CONN_FLAGS_OOO));
377 static int cor_ackconn_prepare_requeue(struct cor_conn *cn_l,
378 struct cor_control_msg_out *cm)
380 if (unlikely(unlikely(cn_l->sourcetype != SOURCE_IN) ||
381 unlikely(cn_l->source.in.nb != cm->nb) ||
382 unlikely(
383 cor_get_connid_reverse(cn_l->source.in.conn_id) !=
384 cm->msg.ack_conn.conn_id) ||
385 unlikely(cn_l->isreset != 0)))
386 return 0;
388 cor_remove_connack_oooflag_ifold(cn_l, cm);
390 if (!cor_seqno_eq(cm->msg.ack_conn.ack_seqno,
391 cn_l->source.in.ack_seqno))
392 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
393 (~KP_ACK_CONN_FLAGS_SEQNO) &
394 (~KP_ACK_CONN_FLAGS_WINDOW));
396 if (cm->msg.ack_conn.flags == 0)
397 return 0;
399 cm->length = 5 + cor_ack_conn_len(cm->msg.ack_conn.flags);
401 return 1;
404 static void cor_requeue_control_retrans(struct cor_control_retrans *cr)
406 atomic_inc(&(cr->nb->cmsg_bulk_readds));
408 while (list_empty(&(cr->msgs)) == 0) {
409 struct cor_control_msg_out *cm = container_of(cr->msgs.prev,
410 struct cor_control_msg_out, lh);
411 list_del(&(cm->lh));
413 BUG_ON(cm->nb != cr->nb);
415 if (cm->type == MSGTYPE_ACK_CONN) {
416 struct cor_conn *cn_l = cm->msg.ack_conn.src_in;
417 spin_lock_bh(&(cn_l->rcv_lock));
418 if (unlikely(cor_ackconn_prepare_requeue(cn_l,
419 cm) == 0)) {
420 cor_free_control_msg(cm);
421 } else {
422 cor_merge_or_enqueue_ackconn(cn_l, cm,
423 ADDCMSG_SRC_RETRANS);
426 spin_unlock_bh(&(cn_l->rcv_lock));
427 } else {
428 if (cm->type == MSGTYPE_PONG)
429 atomic_dec(&(cm->nb->cmsg_pongs_retrans_cnt));
430 cor_enqueue_control_msg(cm, ADDCMSG_SRC_RETRANS);
434 atomic_dec(&(cr->nb->cmsg_bulk_readds));
436 spin_lock_bh(&(cr->nb->cmsg_lock));
437 cor_schedule_controlmsg_timer(cr->nb);
438 spin_unlock_bh(&(cr->nb->cmsg_lock));
441 static void _cor_empty_retrans_queue(struct cor_neighbor *nb_retranslocked,
442 struct list_head *retrans_list)
444 while (!list_empty(retrans_list)) {
445 struct cor_control_retrans *cr = container_of(
446 retrans_list->next, struct cor_control_retrans,
447 timeout_list);
449 BUG_ON(cr->nb != nb_retranslocked);
451 list_del(&(cr->timeout_list));
452 rb_erase(&(cr->rbn), &(nb_retranslocked->kp_retransmits_rb));
454 kref_put(&(cr->ref), cor_kreffree_bug); /* rb */
455 kref_put(&(cr->ref), cor_free_control_retrans); /* list */
459 static void cor_empty_retrans_queue(struct cor_neighbor *nb_retranslocked)
461 _cor_empty_retrans_queue(nb_retranslocked,
462 &(nb_retranslocked->retrans_fast_list));
463 _cor_empty_retrans_queue(nb_retranslocked,
464 &(nb_retranslocked->retrans_slow_list));
467 static unsigned long cor_get_retransmit_timeout(
468 struct cor_neighbor *nb_retranslocked)
470 struct cor_control_retrans *cr1 = 0;
471 struct cor_control_retrans *cr2 = 0;
472 struct cor_control_retrans *cr = 0;
474 if (list_empty(&(nb_retranslocked->retrans_fast_list)) == 0) {
475 cr1 = container_of(nb_retranslocked->retrans_fast_list.next,
476 struct cor_control_retrans, timeout_list);
477 BUG_ON(cr1->nb != nb_retranslocked);
480 if (list_empty(&(nb_retranslocked->retrans_slow_list)) == 0) {
481 cr2 = container_of(nb_retranslocked->retrans_slow_list.next,
482 struct cor_control_retrans, timeout_list);
483 BUG_ON(cr2->nb != nb_retranslocked);
486 if (cr1 == 0)
487 cr = cr2;
488 else if (cr2 == 0)
489 cr = cr1;
490 else
491 cr = (time_after(cr1->timeout, cr2->timeout) ? cr2 : cr1);
493 BUG_ON(cr == 0);
495 return cr->timeout;
498 void cor_retransmit_timerfunc(struct timer_list *retrans_timer)
500 struct cor_neighbor *nb = container_of(retrans_timer,
501 struct cor_neighbor, retrans_timer);
502 int nbstate = cor_get_neigh_state(nb);
503 unsigned long timeout;
505 spin_lock_bh(&(nb->retrans_lock));
507 if (list_empty(&(nb->retrans_fast_list)) &&
508 list_empty(&(nb->retrans_slow_list))) {
509 spin_unlock_bh(&(nb->retrans_lock));
510 kref_put(&(nb->ref), cor_neighbor_free);
511 return;
514 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
515 cor_empty_retrans_queue(nb);
516 spin_unlock_bh(&(nb->retrans_lock));
517 kref_put(&(nb->ref), cor_neighbor_free);
518 return;
521 timeout = cor_get_retransmit_timeout(nb);
523 if (time_after(timeout, jiffies)) {
524 int rc = mod_timer(&(nb->retrans_timer), timeout);
525 spin_unlock_bh(&(nb->retrans_lock));
526 if (rc != 0)
527 kref_put(&(nb->ref), cor_neighbor_free);
528 return;
531 spin_unlock_bh(&(nb->retrans_lock));
533 spin_lock_bh(&(nb->cmsg_lock));
534 nb->add_retrans_needed = 1;
535 cor_schedule_controlmsg_timer(nb);
536 spin_unlock_bh(&(nb->cmsg_lock));
538 kref_put(&(nb->ref), cor_neighbor_free);
541 static void cor_schedule_retransmit(struct cor_control_retrans *cr,
542 struct cor_neighbor *nb, int fastack)
544 int first;
546 cr->timeout = cor_calc_timeout(atomic_read(&(nb->latency_retrans_us)),
547 atomic_read(&(nb->latency_stddev_retrans_us)),
548 fastack ?
549 atomic_read(&(nb->max_remote_ack_fast_delay_us)) :
550 atomic_read(&(nb->max_remote_ack_slow_delay_us)));
552 spin_lock_bh(&(nb->retrans_lock));
554 cor_insert_control_retrans(cr);
555 if (fastack) {
556 first = list_empty(&(nb->retrans_fast_list));
557 list_add_tail(&(cr->timeout_list), &(nb->retrans_fast_list));
558 } else {
559 first = list_empty(&(nb->retrans_slow_list));
560 list_add_tail(&(cr->timeout_list), &(nb->retrans_slow_list));
563 if (first) {
564 if (mod_timer(&(nb->retrans_timer),
565 cor_get_retransmit_timeout(nb)) == 0) {
566 kref_get(&(nb->ref));
570 spin_unlock_bh(&(nb->retrans_lock));
573 void cor_kern_ack_rcvd(struct cor_neighbor *nb, __u64 seqno)
575 struct cor_control_retrans *cr = 0;
577 spin_lock_bh(&(nb->retrans_lock));
579 cr = cor_get_control_retrans(nb, seqno);
581 if (cr == 0) {
582 /* char *seqno_p = (char *) &seqno;
583 seqno = cpu_to_be32(seqno);
584 printk(KERN_ERR "bogus/duplicate ack received %d %d %d %d",
585 seqno_p[0], seqno_p[1], seqno_p[2], seqno_p[3]);
587 goto out;
590 rb_erase(&(cr->rbn), &(nb->kp_retransmits_rb));
592 BUG_ON(cr->nb != nb);
594 list_del(&(cr->timeout_list));
596 out:
597 spin_unlock_bh(&(nb->retrans_lock));
599 if (cr != 0) {
600 /* cor_get_control_retrans */
601 kref_put(&(cr->ref), cor_kreffree_bug);
603 kref_put(&(cr->ref), cor_kreffree_bug); /* rb_erase */
604 kref_put(&(cr->ref), cor_free_control_retrans); /* list */
608 static __u16 cor_get_window(struct cor_conn *cn,
609 struct cor_neighbor *expectedsender, __u32 expected_connid)
611 __u16 window = 0;
613 BUG_ON(expectedsender == 0);
615 spin_lock_bh(&(cn->rcv_lock));
617 if (cor_is_conn_in(cn, expectedsender, expected_connid) == 0)
618 goto out;
620 window = cor_enc_window(cor_seqno_clean(
621 cn->source.in.window_seqnolimit -
622 cn->source.in.next_seqno));
624 cn->source.in.window_seqnolimit_remote = cn->source.in.next_seqno +
625 cor_dec_window(window);
627 out:
628 spin_unlock_bh(&(cn->rcv_lock));
630 return window;
633 /* static void padding(struct sk_buff *skb, __u32 length)
635 char *dst;
636 if (length <= 0)
637 return;
638 dst = skb_put(skb, length);
639 BUG_ON(dst == 0);
640 memset(dst, KP_PADDING, length);
641 } */
644 static __u32 cor_add_init_session(struct sk_buff *skb, __be32 sessionid,
645 __u32 spaceleft)
647 char *dst;
649 BUG_ON(KP_MISC_INIT_SESSION_CMDLEN != 5);
651 if (unlikely(spaceleft < 5))
652 return 0;
654 dst = skb_put(skb, 5);
655 BUG_ON(dst == 0);
657 dst[0] = get_kp_code(KP_MISC, KP_MISC_INIT_SESSION);
658 cor_put_be32(dst + 1, sessionid);
660 return 5;
663 static __u32 cor_add_ack(struct sk_buff *skb, struct cor_control_retrans *cr,
664 struct cor_control_msg_out *cm, __u32 spaceleft)
666 char *dst;
668 BUG_ON(cm->length != 7);
670 if (unlikely(spaceleft < 7))
671 return 0;
673 dst = skb_put(skb, 7);
674 BUG_ON(dst == 0);
676 dst[0] = get_kp_code(KP_MISC, KP_MISC_ACK);
677 cor_put_u48(dst + 1, cm->msg.ack.seqno);
679 list_add_tail(&(cm->lh), &(cr->msgs));
681 return 7;
684 static inline __u8 cor_add_ack_conn_get_delayremaining(
685 struct cor_control_msg_out *cm, unsigned long cmsg_send_start_j)
687 __u32 maxdelay_ms = 0;
688 unsigned long jiffies_timeout;
689 if (cm->msg.ack_conn.is_highlatency) {
690 maxdelay_ms = CMSG_MAXDELAY_ACKCONN_HIGHLATENCY_MS;
691 } else {
692 maxdelay_ms = CMSG_MAXDELAY_ACKCONN_LOWLATENCY_MS;
695 jiffies_timeout = cm->time_added + msecs_to_jiffies(maxdelay_ms);
697 if (time_before_eq(cmsg_send_start_j, cm->time_added)) {
698 return 255;
699 } else if (time_after_eq(cmsg_send_start_j, jiffies_timeout)) {
700 return 0;
701 } else {
702 __u64 delay_remaining = jiffies_timeout - cmsg_send_start_j;
704 BUG_ON(delay_remaining > U32_MAX);
705 BUG_ON(delay_remaining > msecs_to_jiffies(maxdelay_ms));
707 return (__u8) div64_u64(255 * delay_remaining +
708 msecs_to_jiffies(maxdelay_ms)/2,
709 msecs_to_jiffies(maxdelay_ms));
713 static __u32 cor_add_ack_conn(struct sk_buff *skb,
714 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
715 __u32 spaceleft, unsigned long cmsg_send_start_j,
716 int *ackneeded)
718 char *dst;
719 __u32 offset = 0;
721 if (unlikely(spaceleft < cm->length))
722 return 0;
724 dst = skb_put(skb, cm->length);
725 BUG_ON(dst == 0);
727 dst[offset] = get_kp_code(KP_ACK_CONN, cm->msg.ack_conn.flags);
728 offset++;
729 cor_put_u32(dst + offset, cm->msg.ack_conn.conn_id);
730 offset += 4;
732 if (likely((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) != 0 ||
733 cor_ooolen(cm->msg.ack_conn.flags) != 0)) {
734 dst[offset] = cor_add_ack_conn_get_delayremaining(cm,
735 cmsg_send_start_j);
736 offset++;
739 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) != 0) {
740 cor_put_u48(dst + offset, cm->msg.ack_conn.seqno);
741 offset += 6;
743 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_WINDOW) != 0) {
744 BUG_ON(cm->msg.ack_conn.src_in == 0);
745 cor_put_u16(dst + offset, cor_get_window(
746 cm->msg.ack_conn.src_in,
747 cm->nb, cor_get_connid_reverse(
748 cm->msg.ack_conn.conn_id)));
749 offset += 2;
750 dst[offset] = cm->msg.ack_conn.bufsize_changerate;
751 offset++;
755 if (cor_ooolen(cm->msg.ack_conn.flags) != 0) {
756 cor_put_u48(dst + offset, cm->msg.ack_conn.seqno_ooo);
757 offset += 6;
758 if (cor_ooolen(cm->msg.ack_conn.flags) == 1) {
759 BUG_ON(cm->msg.ack_conn.length > 255);
760 dst[offset] = cm->msg.ack_conn.length;
761 offset += 1;
762 } else if (cor_ooolen(cm->msg.ack_conn.flags) == 2) {
763 BUG_ON(cm->msg.ack_conn.length <= 255);
764 BUG_ON(cm->msg.ack_conn.length > 65535);
765 cor_put_u16(dst + offset, cm->msg.ack_conn.length);
766 offset += 2;
767 } else if (cor_ooolen(cm->msg.ack_conn.flags) == 4) {
768 BUG_ON(cm->msg.ack_conn.length <= 65535);
769 cor_put_u32(dst + offset, cm->msg.ack_conn.length);
770 offset += 4;
771 } else {
772 BUG();
776 if (unlikely((cm->msg.ack_conn.flags &
777 KP_ACK_CONN_FLAGS_PRIORITY) != 0)) {
778 __u16 priority = (cm->msg.ack_conn.priority_seqno << 12) &
779 cm->msg.ack_conn.priority;
780 BUG_ON(cm->msg.ack_conn.priority_seqno > 15);
781 BUG_ON(cm->msg.ack_conn.priority > 4095);
783 cor_put_u16(dst + offset, priority);
784 offset+=2;
787 list_add_tail(&(cm->lh), &(cr->msgs));
788 if (likely((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) != 0 ||
789 cor_ooolen(cm->msg.ack_conn.flags) != 0) &&
790 cm->msg.ack_conn.is_highlatency == 0) {
791 *ackneeded = ACK_NEEDED_FAST;
792 } else if (*ackneeded != ACK_NEEDED_FAST) {
793 *ackneeded = ACK_NEEDED_SLOW;
796 BUG_ON(offset != cm->length);
797 return offset;
800 static __u32 cor_add_ping(struct sk_buff *skb, __u32 cookie, __u32 spaceleft)
802 char *dst;
804 BUG_ON(KP_MISC_PING_CMDLEN != 5);
806 if (unlikely(spaceleft < 5))
807 return 0;
809 dst = skb_put(skb, 5);
810 BUG_ON(dst == 0);
812 dst[0] = get_kp_code(KP_MISC, KP_MISC_PING);
813 cor_put_u32(dst + 1, cookie);
815 return 5;
818 static __u32 cor_calc_respdelay(ktime_t time_pong_enqueued, ktime_t time_end)
820 if (unlikely(ktime_before(time_end, time_pong_enqueued))) {
821 return 0;
822 } else {
823 __s64 respdelay = div_u64(ktime_to_ns(time_end) -
824 ktime_to_ns(time_pong_enqueued) + 500,
825 1000);
827 if (unlikely(respdelay > U32_MAX))
828 return U32_MAX;
829 else if (unlikely(respdelay < 0))
830 return 0;
831 else
832 return (__u32) respdelay;
836 static __u32 cor_add_pong(struct sk_buff *skb, struct cor_control_retrans *cr,
837 struct cor_control_msg_out *cm, __u32 spaceleft, int nbstate,
838 ktime_t cmsg_send_start, int *ackneeded)
840 __u32 respdelay_full;
841 __u32 respdelay_netonly;
842 char *dst;
844 BUG_ON(cm->length != 13);
846 if (unlikely(spaceleft < 13))
847 return 0;
849 respdelay_full = cor_calc_respdelay(cm->msg.pong.time_enqueued,
850 cmsg_send_start);
851 respdelay_netonly = cor_calc_respdelay(cm->msg.pong.ping_rcvtime,
852 ktime_get());
854 dst = skb_put(skb, 13);
855 BUG_ON(dst == 0);
857 dst[0] = get_kp_code(KP_MISC, KP_MISC_PONG);
858 cor_put_u32(dst + 1, cm->msg.pong.cookie);
859 cor_put_u32(dst + 5, (__u32) respdelay_full);
860 cor_put_u32(dst + 9, (__u32) respdelay_netonly);
862 list_add_tail(&(cm->lh), &(cr->msgs));
863 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE) &&
864 *ackneeded != ACK_NEEDED_FAST)
865 *ackneeded = ACK_NEEDED_SLOW;
867 return 13;
870 static __u32 cor_add_connect(struct sk_buff *skb,
871 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
872 __u32 spaceleft, int *ackneeded)
874 char *dst;
875 struct cor_conn *src_in = cm->msg.connect.src_in;
876 struct cor_conn *trgt_out = cor_get_conn_reversedir(src_in);
877 __u16 priority;
879 BUG_ON(cm->length != 22);
881 if (unlikely(spaceleft < 22))
882 return 0;
884 dst = skb_put(skb, 22);
885 BUG_ON(dst == 0);
887 dst[0] = get_kp_code(KP_MISC, KP_MISC_CONNECT);
888 cor_put_u32(dst + 1, cm->msg.connect.conn_id);
889 cor_put_u48(dst + 5, cm->msg.connect.seqno1);
890 cor_put_u48(dst + 11, cm->msg.connect.seqno2);
891 BUG_ON(cm->msg.connect.src_in == 0);
892 cor_put_u16(dst + 17, cor_get_window(cm->msg.connect.src_in, cm->nb,
893 cor_get_connid_reverse(cm->msg.connect.conn_id)));
895 spin_lock_bh(&(trgt_out->rcv_lock));
896 BUG_ON(trgt_out->targettype != TARGET_OUT);
898 priority = (trgt_out->target.out.priority_seqno << 12) &
899 trgt_out->target.out.priority_last;
900 BUG_ON(trgt_out->target.out.priority_seqno > 15);
901 BUG_ON(trgt_out->target.out.priority_last > 4095);
902 cor_put_u16(dst + 19, priority);
904 if (src_in->is_highlatency == 0)
905 dst[21] = 0;
906 else
907 dst[21] = 1;
909 spin_unlock_bh(&(trgt_out->rcv_lock));
911 list_add_tail(&(cm->lh), &(cr->msgs));
912 if (*ackneeded != ACK_NEEDED_FAST)
913 *ackneeded = ACK_NEEDED_SLOW;
915 return 22;
918 static __u32 cor_add_connect_success(struct sk_buff *skb,
919 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
920 __u32 spaceleft, int *ackneeded)
922 char *dst;
924 BUG_ON(cm->length != 7);
926 if (unlikely(spaceleft < 7))
927 return 0;
929 dst = skb_put(skb, 7);
930 BUG_ON(dst == 0);
932 dst[0] = get_kp_code(KP_MISC, KP_MISC_CONNECT_SUCCESS);
933 cor_put_u32(dst + 1, cm->msg.connect_success.conn_id);
934 BUG_ON(cm->msg.connect_success.src_in == 0);
935 cor_put_u16(dst + 5, cor_get_window(
936 cm->msg.connect_success.src_in, cm->nb,
937 cor_get_connid_reverse(
938 cm->msg.connect_success.conn_id)));
940 list_add_tail(&(cm->lh), &(cr->msgs));
941 if (*ackneeded != ACK_NEEDED_FAST)
942 *ackneeded = ACK_NEEDED_SLOW;
944 return 7;
947 static __u32 cor_add_reset_conn(struct sk_buff *skb,
948 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
949 __u32 spaceleft, int *ackneeded)
951 char *dst;
953 BUG_ON(cm->length != 5);
955 if (unlikely(spaceleft < 5))
956 return 0;
958 dst = skb_put(skb, 5);
959 BUG_ON(dst == 0);
961 dst[0] = get_kp_code(KP_MISC, KP_MISC_RESET_CONN);
962 cor_put_u32(dst + 1, cm->msg.reset_conn.conn_id);
964 list_add_tail(&(cm->lh), &(cr->msgs));
965 if (*ackneeded != ACK_NEEDED_FAST)
966 *ackneeded = ACK_NEEDED_SLOW;
968 return 5;
971 static __u32 cor_add_conndata(struct sk_buff *skb,
972 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
973 __u32 spaceleft, struct cor_control_msg_out **split_conndata,
974 __u32 *sc_sendlen)
976 char *dst;
977 __u32 offset = 0;
979 __u32 totallen = get_kp_conn_data_length(cm->msg.conn_data.datalen);
980 __u32 putlen = totallen;
981 __u32 dataputlen = cm->msg.conn_data.datalen;
982 __u8 code_min = 0;
984 BUILD_BUG_ON(KP_CONN_DATA_MAXLEN != 128+32767);
985 BUG_ON(cm->msg.conn_data.datalen > KP_CONN_DATA_MAXLEN);
987 BUG_ON(cm->length != totallen);
989 BUG_ON(putlen > 1024*1024*1024);
991 BUG_ON(split_conndata == 0);
992 BUG_ON(*split_conndata != 0);
993 BUG_ON(sc_sendlen == 0);
994 BUG_ON(*sc_sendlen != 0);
996 if (putlen > spaceleft) {
997 if (spaceleft < get_kp_conn_data_length(1))
998 return 0;
1000 BUG_ON(spaceleft < 13);
1002 if (spaceleft <= 127 + 12) {
1003 dataputlen = spaceleft - 12;
1004 putlen = spaceleft;
1005 } else if (spaceleft == 127 - 12 + 1) {
1006 dataputlen = spaceleft - 12 - 1;
1007 putlen = spaceleft - 1;
1008 } else {
1009 dataputlen = spaceleft - 13;
1010 putlen = spaceleft;
1013 BUG_ON(putlen != get_kp_conn_data_length(dataputlen));
1016 dst = skb_put(skb, putlen);
1017 BUG_ON(dst == 0);
1019 BUG_ON((cm->msg.conn_data.windowused &
1020 (~KP_CONN_DATA_FLAGS_WINDOWUSED)) != 0);
1021 code_min = 0;
1022 if (cm->msg.conn_data.flush != 0)
1023 code_min |= KP_CONN_DATA_FLAGS_FLUSH;
1024 code_min |= cm->msg.conn_data.windowused;
1026 dst[0] = get_kp_code(KP_CONN_DATA, code_min);
1027 offset++;
1028 cor_put_u32(dst + offset, cm->msg.conn_data.conn_id);
1029 offset += 4;
1030 cor_put_u48(dst + offset, cm->msg.conn_data.seqno);
1031 offset += 6;
1033 if (dataputlen < 128) {
1034 dst[offset] = (__u8) dataputlen;
1035 offset++;
1036 } else {
1037 __u8 high = (__u8) (128 + ((dataputlen - 128) / 256));
1038 __u8 low = (__u8) ((dataputlen - 128) % 256);
1039 BUG_ON(((dataputlen - 128) / 256) > 127);
1040 dst[offset] = high;
1041 dst[offset+1] = low;
1042 offset += 2;
1045 BUG_ON(offset > putlen);
1046 BUG_ON(putlen - offset != dataputlen);
1047 memcpy(dst + offset, cm->msg.conn_data.data, dataputlen);
1048 offset += dataputlen;
1050 if (cm->msg.conn_data.datalen == dataputlen) {
1051 BUG_ON(cm->length != putlen);
1052 list_add_tail(&(cm->lh), &(cr->msgs));
1053 } else {
1054 *split_conndata = cm;
1055 *sc_sendlen = dataputlen;
1058 return putlen;
1061 static __u32 cor_add_set_max_cmsg_dly(struct sk_buff *skb,
1062 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
1063 __u32 spaceleft, int *ackneeded)
1065 char *dst;
1067 BUG_ON(KP_MISC_SET_MAX_CMSG_DELAY_CMDLEN != 21);
1068 BUG_ON(cm->length != KP_MISC_SET_MAX_CMSG_DELAY_CMDLEN);
1070 if (unlikely(spaceleft < 21))
1071 return 0;
1073 dst = skb_put(skb, 21);
1074 BUG_ON(dst == 0);
1076 dst[0] = get_kp_code(KP_MISC, KP_MISC_SET_MAX_CMSG_DELAY);
1077 cor_put_u32(dst + 1, cm->msg.set_max_cmsg_delay.ack_fast_delay);
1078 cor_put_u32(dst + 5, cm->msg.set_max_cmsg_delay.ack_slow_delay);
1079 cor_put_u32(dst + 9,
1080 cm->msg.set_max_cmsg_delay.ackconn_lowlatency_delay);
1081 cor_put_u32(dst + 13,
1082 cm->msg.set_max_cmsg_delay.ackconn_highlatency_delay);
1083 cor_put_u32(dst + 17, cm->msg.set_max_cmsg_delay.pong_delay);
1085 list_add_tail(&(cm->lh), &(cr->msgs));
1086 if (*ackneeded != ACK_NEEDED_FAST)
1087 *ackneeded = ACK_NEEDED_SLOW;
1089 return 21;
1092 static __u32 cor_add_message(struct sk_buff *skb,
1093 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
1094 __u32 spaceleft, int nbstate, unsigned long cmsg_send_start_j,
1095 ktime_t cmsg_send_start_kt,
1096 struct cor_control_msg_out **split_conndata, __u32 *sc_sendlen,
1097 int *ackneeded)
1099 BUG_ON(split_conndata != 0 && *split_conndata != 0);
1100 BUG_ON(sc_sendlen != 0 && *sc_sendlen != 0);
1102 switch (cm->type) {
1103 case MSGTYPE_ACK:
1104 return cor_add_ack(skb, cr, cm, spaceleft);
1105 case MSGTYPE_ACK_CONN:
1106 return cor_add_ack_conn(skb, cr, cm, spaceleft,
1107 cmsg_send_start_j, ackneeded);
1108 case MSGTYPE_PONG:
1109 return cor_add_pong(skb, cr, cm, spaceleft, nbstate,
1110 cmsg_send_start_kt, ackneeded);
1111 case MSGTYPE_CONNECT:
1112 return cor_add_connect(skb, cr, cm, spaceleft, ackneeded);
1113 case MSGTYPE_CONNECT_SUCCESS:
1114 return cor_add_connect_success(skb, cr, cm, spaceleft,
1115 ackneeded);
1116 case MSGTYPE_RESET_CONN:
1117 return cor_add_reset_conn(skb, cr, cm, spaceleft, ackneeded);
1118 case MSGTYPE_CONNDATA:
1119 return cor_add_conndata(skb, cr, cm, spaceleft, split_conndata,
1120 sc_sendlen);
1121 case MSGTYPE_SET_MAX_CMSG_DELAY:
1122 return cor_add_set_max_cmsg_dly(skb, cr, cm, spaceleft,
1123 ackneeded);
1124 default:
1125 BUG();
1127 BUG();
1128 return 0;
1131 static __u32 ___cor_send_messages(struct cor_neighbor *nb, struct sk_buff *skb,
1132 struct cor_control_retrans *cr, struct list_head *cmsgs,
1133 __u32 spaceleft, int nbstate, unsigned long cmsg_send_start_j,
1134 ktime_t cmsg_send_start_kt,
1135 struct cor_control_msg_out **split_conndata, __u32 *sc_sendlen,
1136 int *ackneeded)
1138 __u32 length = 0;
1139 while (!list_empty(cmsgs)) {
1140 __u32 rc;
1141 struct cor_control_msg_out *cm = container_of(cmsgs->next,
1142 struct cor_control_msg_out, lh);
1144 list_del(&(cm->lh));
1146 rc = cor_add_message(skb, cr, cm, spaceleft - length, nbstate,
1147 cmsg_send_start_j, cmsg_send_start_kt,
1148 split_conndata, sc_sendlen, ackneeded);
1149 if (rc == 0) {
1150 BUG();
1151 list_add(&(cm->lh), cmsgs);
1152 break;
1155 BUG_ON(rc != cm->length && cm->type != MSGTYPE_CONNDATA);
1157 length += rc;
1160 return length;
1163 static __u32 ___cor_send_messages_smcd(struct cor_neighbor *nb,
1164 struct sk_buff *skb, struct cor_control_retrans *cr,
1165 __u32 spaceleft, int nbstate, unsigned long cmsg_send_start_j,
1166 ktime_t cmsg_send_start_kt, int *ackneeded)
1168 struct cor_control_msg_out *cm;
1169 __u32 rc;
1171 cm = cor_alloc_control_msg(nb, ACM_PRIORITY_MED);
1173 if (unlikely(cm == 0))
1174 return 0;
1176 cm->type = MSGTYPE_SET_MAX_CMSG_DELAY;
1177 cm->msg.set_max_cmsg_delay.ack_fast_delay =
1178 CMSG_MAXDELAY_ACK_FAST_MS * 1000;
1179 cm->msg.set_max_cmsg_delay.ack_slow_delay =
1180 CMSG_MAXDELAY_ACK_SLOW_MS * 1000;
1181 cm->msg.set_max_cmsg_delay.ackconn_lowlatency_delay =
1182 CMSG_MAXDELAY_ACKCONN_LOWLATENCY_MS * 1000;
1183 cm->msg.set_max_cmsg_delay.ackconn_highlatency_delay =
1184 CMSG_MAXDELAY_ACKCONN_HIGHLATENCY_MS * 1000;
1185 cm->msg.set_max_cmsg_delay.pong_delay =
1186 CMSG_MAXDELAY_OTHER_MS * 1000;
1187 cm->length = KP_MISC_SET_MAX_CMSG_DELAY_CMDLEN;
1189 rc = cor_add_message(skb, cr, cm, spaceleft, nbstate, cmsg_send_start_j,
1190 cmsg_send_start_kt, 0, 0, ackneeded);
1192 nb->max_cmsg_delay_sent = 1;
1194 return rc;
1197 #define CMSGQUEUE_PONG 1
1198 #define CMSGQUEUE_ACK_FAST 2
1199 #define CMSGQUEUE_ACK_SLOW 3
1200 #define CMSGQUEUE_ACK_CONN_URGENT 4
1201 #define CMSGQUEUE_ACK_CONN_LOWLAT 5
1202 #define CMSGQUEUE_ACK_CONN_HIGHLAT 6
1203 #define CMSGQUEUE_CONNDATA_LOWLAT 7
1204 #define CMSGQUEUE_CONNDATA_HIGHLAT 8
1205 #define CMSGQUEUE_OTHER 9
1207 static void cor_requeue_message(struct cor_control_msg_out *cm)
1209 if (cm->type == MSGTYPE_ACK_CONN) {
1210 struct cor_conn *cn_l = cm->msg.ack_conn.src_in;
1212 spin_lock_bh(&(cn_l->rcv_lock));
1213 if (unlikely(cor_ackconn_prepare_requeue(cn_l, cm) == 0)) {
1214 cor_free_control_msg(cm);
1215 } else {
1216 spin_lock_bh(&(cm->nb->cmsg_lock));
1218 if (unlikely(cm->msg.ack_conn.queue ==
1219 CMSGQUEUE_ACK_CONN_URGENT)) {
1220 list_add(&(cm->lh), &(cm->nb->
1221 cmsg_queue_ackconn_urgent));
1222 } else if (cm->msg.ack_conn.queue ==
1223 CMSGQUEUE_ACK_CONN_LOWLAT) {
1224 list_add(&(cm->lh), &(cm->nb->
1225 cmsg_queue_ackconn_lowlat));
1226 } else if (cm->msg.ack_conn.queue ==
1227 CMSGQUEUE_ACK_CONN_HIGHLAT) {
1228 list_add(&(cm->lh), &(cm->nb->
1229 cmsg_queue_ackconn_highlat));
1230 } else {
1231 BUG();
1234 cm->nb->cmsg_otherlength += cm->length;
1236 list_add(&(cm->msg.ack_conn.conn_acks),
1237 &(cn_l->source.in.acks_pending));
1238 cor_try_merge_ackconns(cn_l, cm);
1240 spin_unlock_bh(&(cm->nb->cmsg_lock));
1242 spin_unlock_bh(&(cn_l->rcv_lock));
1243 return;
1246 cor_enqueue_control_msg(cm, ADDCMSG_SRC_READD);
1249 static void cor_requeue_messages(struct list_head *lh)
1251 while (list_empty(lh) == 0) {
1252 struct cor_control_msg_out *cm = container_of(lh->prev,
1253 struct cor_control_msg_out, lh);
1254 list_del(&(cm->lh));
1255 cor_requeue_message(cm);
1259 static int __cor_send_messages_send(struct cor_neighbor *nb,
1260 struct sk_buff *skb, char *packet_type, int ping,
1261 int initsession, struct cor_control_retrans *cr,
1262 struct list_head *cmsgs, __u32 spaceleft, int nbstate,
1263 unsigned long cmsg_send_start_j, ktime_t cmsg_send_start_kt,
1264 int *sent)
1266 int rc;
1267 int ackneeded = ACK_NEEDED_NO;
1268 __u32 length = 0;
1269 __u32 pinglen = 0;
1270 __u32 pingcookie = 0;
1271 unsigned long last_ping_time;
1272 struct cor_control_msg_out *split_conndata = 0;
1273 __u32 sc_sendlen = 0;
1275 if (ping != TIMETOSENDPING_NO) {
1276 __u32 rc;
1278 if (unlikely(initsession)) {
1279 rc = cor_add_init_session(skb, nb->sessionid,
1280 spaceleft - length);
1281 BUG_ON(rc <= 0);
1282 pinglen = rc;
1283 length += rc;
1286 pingcookie = cor_add_ping_req(nb, &last_ping_time);
1287 rc = cor_add_ping(skb, pingcookie, spaceleft - length);
1288 BUG_ON(rc <= 0);
1289 pinglen += rc;
1290 length += rc;
1293 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE) &&
1294 unlikely(nb->max_cmsg_delay_sent == 0))
1295 length += ___cor_send_messages_smcd(nb, skb, cr,
1296 spaceleft - length, nbstate, cmsg_send_start_j,
1297 cmsg_send_start_kt, &ackneeded);
1299 length += ___cor_send_messages(nb, skb, cr, cmsgs, spaceleft - length,
1300 nbstate, cmsg_send_start_j, cmsg_send_start_kt,
1301 &split_conndata, &sc_sendlen, &ackneeded);
1303 BUG_ON(length > spaceleft);
1305 if (likely(ping != TIMETOSENDPING_FORCE) &&
1306 pinglen != 0 && unlikely(length == pinglen)) {
1307 cor_unadd_ping_req(nb, pingcookie, last_ping_time, 0);
1308 goto drop;
1311 if (unlikely(length == 0)) {
1312 drop:
1313 kfree_skb(skb);
1315 BUG_ON(list_empty(&(cr->msgs)) == 0);
1316 kref_put(&(cr->ref), cor_free_control_retrans);
1318 nb->kpacket_seqno--;
1319 return QOS_RESUME_DONE;
1322 //padding(skb, spaceleft - length);
1323 BUG_ON(spaceleft - length != 0 &&
1324 (split_conndata == 0 || spaceleft - length != 1));
1326 if (ackneeded == ACK_NEEDED_NO) {
1327 *packet_type = PACKET_TYPE_CMSG_NOACK;
1328 } else if (ackneeded == ACK_NEEDED_SLOW) {
1329 *packet_type = PACKET_TYPE_CMSG_ACKSLOW;
1330 } else if (ackneeded == ACK_NEEDED_FAST) {
1331 *packet_type = PACKET_TYPE_CMSG_ACKFAST;
1332 } else {
1333 BUG();
1336 rc = cor_dev_queue_xmit(skb, nb->queue, QOS_CALLER_KPACKET);
1337 if (rc == NET_XMIT_SUCCESS)
1338 *sent = 1;
1340 if (rc == NET_XMIT_DROP) {
1341 if (ping != 0)
1342 cor_unadd_ping_req(nb, pingcookie, last_ping_time, 1);
1344 atomic_inc(&(nb->cmsg_bulk_readds));
1345 if (split_conndata != 0)
1346 cor_requeue_message(split_conndata);
1348 cor_requeue_messages(&(cr->msgs));
1350 kref_put(&(cr->ref), cor_free_control_retrans);
1352 atomic_dec(&(nb->cmsg_bulk_readds));
1354 spin_lock_bh(&(nb->cmsg_lock));
1355 cor_schedule_controlmsg_timer(nb);
1356 spin_unlock_bh(&(nb->cmsg_lock));
1357 } else {
1358 struct list_head *curr = cr->msgs.next;
1360 if (pingcookie != 0)
1361 cor_ping_sent(nb, pingcookie);
1363 while (curr != &(cr->msgs)) {
1364 struct cor_control_msg_out *cm = container_of(curr,
1365 struct cor_control_msg_out, lh);
1367 curr = curr->next;
1369 if (cm->type == MSGTYPE_ACK || unlikely(
1370 cm->type == MSGTYPE_PONG &&
1371 (nbstate != NEIGHBOR_STATE_ACTIVE))) {
1372 list_del(&(cm->lh));
1373 cor_free_control_msg(cm);
1374 } else if (unlikely(cm->type == MSGTYPE_PONG &&
1375 atomic_inc_return(
1376 &(nb->cmsg_pongs_retrans_cnt)) >
1377 MAX_PONG_CMSGS_RETRANS_PER_NEIGH)) {
1378 atomic_dec(&(nb->cmsg_pongs_retrans_cnt));
1379 list_del(&(cm->lh));
1380 cor_free_control_msg(cm);
1381 } else if (cm->type == MSGTYPE_CONNDATA) {
1382 cor_schedule_retransmit_conn(
1383 cm->msg.conn_data.cr, 0, 0);
1384 kref_put(&(cm->msg.conn_data.cr->ref),
1385 cor_free_connretrans);
1386 cm->msg.conn_data.cr = 0;
1387 kfree(cm->msg.conn_data.data_orig);
1388 list_del(&(cm->lh));
1389 cor_free_control_msg(cm);
1393 if (split_conndata != 0) {
1394 BUG_ON(sc_sendlen == 0);
1395 BUG_ON(sc_sendlen >=
1396 split_conndata->msg.conn_data.datalen);
1398 split_conndata->msg.conn_data.seqno += sc_sendlen;
1399 split_conndata->msg.conn_data.data += sc_sendlen;
1400 split_conndata->msg.conn_data.datalen -= sc_sendlen;
1401 split_conndata->length = get_kp_conn_data_length(
1402 split_conndata->msg.conn_data.datalen);
1403 cor_enqueue_control_msg(split_conndata,
1404 ADDCMSG_SRC_SPLITCONNDATA);
1408 if (list_empty(&(cr->msgs))) {
1409 kref_put(&(cr->ref), cor_free_control_retrans);
1410 } else {
1411 int fastack = (ackneeded == ACK_NEEDED_FAST);
1412 BUG_ON(ackneeded != ACK_NEEDED_FAST &&
1413 ackneeded != ACK_NEEDED_SLOW);
1414 cor_schedule_retransmit(cr, nb, fastack);
1418 return (rc == NET_XMIT_SUCCESS) ? QOS_RESUME_DONE : QOS_RESUME_CONG;
1421 static int _cor_send_messages_send(struct cor_neighbor *nb, int ping,
1422 int initsession, struct list_head *cmsgs, int nbstate,
1423 __u32 length, __u64 seqno, unsigned long cmsg_send_start_j,
1424 ktime_t cmsg_send_start_kt, int *sent)
1426 struct sk_buff *skb;
1427 struct cor_control_retrans *cr;
1428 char *dst;
1429 int rc;
1431 BUG_ON(length > cor_mss_cmsg(nb));
1432 skb = cor_create_packet(nb, length + 7, GFP_ATOMIC);
1433 if (unlikely(skb == 0)) {
1434 printk(KERN_ERR "cor_send_messages(): cannot allocate skb (out of memory?)");
1436 cor_requeue_messages(cmsgs);
1437 return QOS_RESUME_CONG;
1440 cr = kmem_cache_alloc(cor_controlretrans_slab, GFP_ATOMIC);
1441 if (unlikely(cr == 0)) {
1442 printk(KERN_ERR "cor_send_messages(): cannot allocate cor_control_retrans (out of memory?)");
1443 kfree_skb(skb);
1445 cor_requeue_messages(cmsgs);
1446 return QOS_RESUME_CONG;
1449 memset(cr, 0, sizeof(struct cor_control_retrans));
1450 kref_init(&(cr->ref));
1451 cr->nb = nb;
1452 cr->seqno = seqno;
1453 INIT_LIST_HEAD(&(cr->msgs));
1456 dst = skb_put(skb, 7);
1457 BUG_ON(dst == 0);
1459 dst[0] = PACKET_TYPE_NONE;
1460 cor_put_u48(dst + 1, seqno);
1462 rc = __cor_send_messages_send(nb, skb, &(dst[0]), ping, initsession, cr,
1463 cmsgs, length, nbstate, cmsg_send_start_j,
1464 cmsg_send_start_kt, sent);
1466 BUG_ON(!list_empty(cmsgs));
1468 return rc;
1471 static unsigned long cor_get_cmsg_timeout(struct cor_control_msg_out *cm,
1472 int queue)
1474 if (cm->type == MSGTYPE_ACK) {
1475 if (cm->msg.ack.fast != 0) {
1476 BUG_ON(queue != CMSGQUEUE_ACK_FAST);
1477 return cm->time_added + msecs_to_jiffies(
1478 CMSG_MAXDELAY_ACK_FAST_MS);
1479 } else {
1480 BUG_ON(queue != CMSGQUEUE_ACK_SLOW);
1481 return cm->time_added + msecs_to_jiffies(
1482 CMSG_MAXDELAY_ACK_SLOW_MS);
1484 } else if (cm->type == MSGTYPE_ACK_CONN) {
1485 __u32 maxdelay_ms = 0;
1486 if (unlikely(queue == CMSGQUEUE_ACK_CONN_URGENT)) {
1487 maxdelay_ms = CMSG_MAXDELAY_ACKCONN_URGENT_MS;
1488 } else if (queue == CMSGQUEUE_ACK_CONN_LOWLAT) {
1489 maxdelay_ms = CMSG_MAXDELAY_ACKCONN_LOWLATENCY_MS;
1490 } else if (queue == CMSGQUEUE_ACK_CONN_HIGHLAT) {
1491 maxdelay_ms = CMSG_MAXDELAY_ACKCONN_HIGHLATENCY_MS;
1492 } else {
1493 BUG();
1495 return cm->time_added + msecs_to_jiffies(maxdelay_ms);
1496 } else if (cm->type == MSGTYPE_CONNDATA) {
1497 if (cm->msg.conn_data.highlatency != 0) {
1498 BUG_ON(queue != CMSGQUEUE_CONNDATA_HIGHLAT);
1499 return cm->time_added +
1500 msecs_to_jiffies(
1501 CMSG_MAXDELAY_CONNDATA_MS);
1502 } else {
1503 BUG_ON(queue != CMSGQUEUE_CONNDATA_LOWLAT);
1504 return cm->time_added;
1506 } else {
1507 BUG_ON(cm->type == MSGTYPE_PONG && queue != CMSGQUEUE_PONG);
1508 BUG_ON(cm->type != MSGTYPE_PONG && queue != CMSGQUEUE_OTHER);
1510 return cm->time_added +
1511 msecs_to_jiffies(CMSG_MAXDELAY_OTHER_MS);
1515 static void _cor_peek_message(struct cor_neighbor *nb_cmsglocked, int queue,
1516 struct cor_control_msg_out **currcm, unsigned long *currtimeout,
1517 __u32 **currlen)
1519 struct cor_control_msg_out *cm;
1520 unsigned long cmtimeout;
1522 struct list_head *queuelh;
1523 if (queue == CMSGQUEUE_PONG) {
1524 queuelh = &(nb_cmsglocked->cmsg_queue_pong);
1525 } else if (queue == CMSGQUEUE_ACK_FAST) {
1526 queuelh = &(nb_cmsglocked->cmsg_queue_ack_fast);
1527 } else if (queue == CMSGQUEUE_ACK_SLOW) {
1528 queuelh = &(nb_cmsglocked->cmsg_queue_ack_slow);
1529 } else if (queue == CMSGQUEUE_ACK_CONN_URGENT) {
1530 queuelh = &(nb_cmsglocked->cmsg_queue_ackconn_urgent);
1531 } else if (queue == CMSGQUEUE_ACK_CONN_LOWLAT) {
1532 queuelh = &(nb_cmsglocked->cmsg_queue_ackconn_lowlat);
1533 } else if (queue == CMSGQUEUE_ACK_CONN_HIGHLAT) {
1534 queuelh = &(nb_cmsglocked->cmsg_queue_ackconn_highlat);
1535 } else if (queue == CMSGQUEUE_CONNDATA_LOWLAT) {
1536 queuelh = &(nb_cmsglocked->cmsg_queue_conndata_lowlat);
1537 } else if (queue == CMSGQUEUE_CONNDATA_HIGHLAT) {
1538 queuelh = &(nb_cmsglocked->cmsg_queue_conndata_highlat);
1539 } else if (queue == CMSGQUEUE_OTHER) {
1540 queuelh = &(nb_cmsglocked->cmsg_queue_other);
1541 } else {
1542 BUG();
1545 if (list_empty(queuelh))
1546 return;
1548 cm = container_of(queuelh->next, struct cor_control_msg_out, lh);
1549 cmtimeout = cor_get_cmsg_timeout(cm, queue);
1551 BUG_ON(cm->nb != nb_cmsglocked);
1553 if (*currcm == 0 || (time_before(cmtimeout, *currtimeout) &&
1554 time_before(jiffies, *currtimeout))) {
1555 *currcm = cm;
1556 *currtimeout = cmtimeout;
1558 if (queue == CMSGQUEUE_PONG) {
1559 *currlen = &(nb_cmsglocked->cmsg_pongslength);
1560 } else {
1561 *currlen = &(nb_cmsglocked->cmsg_otherlength);
1566 static void cor_peek_message(struct cor_neighbor *nb_cmsglocked, int nbstate,
1567 struct cor_control_msg_out **cm, unsigned long *cmtimeout,
1568 __u32 **len, int for_timeout)
1570 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_PONG, cm, cmtimeout, len);
1571 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE)) {
1572 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK_FAST, cm,
1573 cmtimeout, len);
1574 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK_SLOW, cm,
1575 cmtimeout, len);
1576 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK_CONN_URGENT, cm,
1577 cmtimeout, len);
1578 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK_CONN_LOWLAT, cm,
1579 cmtimeout, len);
1580 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK_CONN_HIGHLAT, cm,
1581 cmtimeout, len);
1582 if (!for_timeout || atomic_read(
1583 &(nb_cmsglocked->cmsg_delay_conndata)) == 0) {
1584 _cor_peek_message(nb_cmsglocked,
1585 CMSGQUEUE_CONNDATA_LOWLAT,
1586 cm, cmtimeout, len);
1587 _cor_peek_message(nb_cmsglocked,
1588 CMSGQUEUE_CONNDATA_HIGHLAT,
1589 cm, cmtimeout, len);
1591 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_OTHER, cm, cmtimeout,
1592 len);
1596 static unsigned long cor_get_cmsg_timer_timeout(
1597 struct cor_neighbor *nb_cmsglocked, int nbstate)
1599 unsigned long pingtimeout = cor_get_next_ping_time(nb_cmsglocked);
1601 struct cor_control_msg_out *cm = 0;
1602 unsigned long cmtimeout;
1603 __u32 *len;
1605 cor_peek_message(nb_cmsglocked, nbstate, &cm, &cmtimeout, &len, 1);
1607 if (cm != 0) {
1608 unsigned long jiffies_tmp = jiffies;
1610 if (time_before(cmtimeout, jiffies_tmp))
1611 return jiffies_tmp;
1612 if (time_before(cmtimeout, pingtimeout))
1613 return cmtimeout;
1616 return pingtimeout;
1619 static void _cor_dequeue_messages(struct cor_neighbor *nb_cmsglocked,
1620 int nbstate, __u32 targetmss, __u32 *length,
1621 struct list_head *cmsgs)
1623 while (1) {
1624 __u32 spaceleft = targetmss - *length;
1625 struct cor_control_msg_out *cm = 0;
1626 unsigned long cmtimeout;
1627 __u32 *len;
1629 cor_peek_message(nb_cmsglocked, nbstate, &cm, &cmtimeout, &len,
1632 if (unlikely(cm == 0))
1633 break;
1635 BUG_ON(len == 0);
1637 if (cm->length > spaceleft) {
1638 if (cm->type == MSGTYPE_CONNDATA) {
1639 BUG_ON(*length == 0 && spaceleft <
1640 get_kp_conn_data_length(1));
1642 if (spaceleft < get_kp_conn_data_length(1) ||
1643 *length > (targetmss/4)*3)
1644 break;
1645 } else {
1646 BUG_ON(*length == 0);
1647 break;
1651 list_del(&(cm->lh));
1652 *len -= cm->length;
1654 if (cm->type == MSGTYPE_ACK_CONN)
1655 list_del(&(cm->msg.ack_conn.conn_acks));
1656 if (unlikely(cm->type == MSGTYPE_PONG)) {
1657 BUG_ON(cm->nb->cmsg_pongscnt == 0);
1658 cm->nb->cmsg_pongscnt--;
1661 if (unlikely(cm->type == MSGTYPE_RESET_CONN)) {
1662 BUG_ON(cm->msg.reset_conn.in_pending_conn_resets == 0);
1663 rb_erase(&(cm->msg.reset_conn.rbn),
1664 &(cm->nb->pending_conn_resets_rb));
1665 cm->msg.reset_conn.in_pending_conn_resets = 0;
1666 kref_put(&(cm->ref), cor_kreffree_bug);
1669 BUG_ON(*length + cm->length < *length);
1670 if (cm->length > targetmss - *length) {
1671 BUG_ON(*length >= targetmss);
1672 BUG_ON(cm->type != MSGTYPE_CONNDATA);
1673 *length = targetmss;
1674 } else {
1675 *length += cm->length;
1678 list_add_tail(&(cm->lh), cmsgs);
1682 static __u32 cor_get_total_messages_length(struct cor_neighbor *nb, int ping,
1683 int initsession, int nbstate, int *extralength)
1685 __u32 length = nb->cmsg_pongslength;
1687 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE)) {
1688 length += nb->cmsg_otherlength;
1690 if (unlikely(nb->max_cmsg_delay_sent == 0)) {
1691 length += KP_MISC_SET_MAX_CMSG_DELAY_CMDLEN;
1692 *extralength += KP_MISC_SET_MAX_CMSG_DELAY_CMDLEN;
1695 if (ping == TIMETOSENDPING_FORCE ||
1696 (length > 0 && ping != TIMETOSENDPING_NO)) {
1697 length += KP_MISC_PING_CMDLEN;
1698 *extralength += KP_MISC_PING_CMDLEN;
1700 if (unlikely(initsession)) {
1701 length += KP_MISC_INIT_SESSION_CMDLEN;
1702 *extralength += KP_MISC_INIT_SESSION_CMDLEN;
1706 return length;
1709 static int cor_dequeue_messages(struct cor_neighbor *nb_cmsglocked, int ping,
1710 int initsession, int nbstate, __u32 targetmss,
1711 __u32 *length, struct list_head *cmsgs)
1713 __u32 extralength = 0;
1714 __u32 totallength;
1716 int cmsgqueue_nonpong_empty = (
1717 list_empty(&(nb_cmsglocked->cmsg_queue_ack_fast)) &&
1718 list_empty(&(nb_cmsglocked->cmsg_queue_ack_slow)) &&
1719 list_empty(&(nb_cmsglocked->cmsg_queue_ackconn_urgent)) &&
1720 list_empty(&(nb_cmsglocked->cmsg_queue_ackconn_lowlat)) &&
1721 list_empty(&(nb_cmsglocked->cmsg_queue_ackconn_highlat)) &&
1722 list_empty(&(nb_cmsglocked->cmsg_queue_conndata_lowlat)) &&
1723 list_empty(&(nb_cmsglocked->cmsg_queue_conndata_highlat)) &&
1724 list_empty(&(nb_cmsglocked->cmsg_queue_other)));
1726 BUG_ON(list_empty(&(nb_cmsglocked->cmsg_queue_pong)) &&
1727 nb_cmsglocked->cmsg_pongslength != 0);
1728 BUG_ON(!list_empty(&(nb_cmsglocked->cmsg_queue_pong)) &&
1729 nb_cmsglocked->cmsg_pongslength == 0);
1730 BUG_ON(cmsgqueue_nonpong_empty &&
1731 nb_cmsglocked->cmsg_otherlength != 0);
1732 BUG_ON(!cmsgqueue_nonpong_empty &&
1733 nb_cmsglocked->cmsg_otherlength == 0);
1735 totallength = cor_get_total_messages_length(nb_cmsglocked, ping,
1736 initsession, nbstate, &extralength);
1738 if (totallength == 0)
1739 return 1;
1741 if (totallength < targetmss && ping != TIMETOSENDPING_FORCE &&
1742 time_after(cor_get_cmsg_timer_timeout(nb_cmsglocked,
1743 nbstate), jiffies))
1744 return 1;
1746 *length = extralength;
1748 _cor_dequeue_messages(nb_cmsglocked, nbstate, targetmss, length, cmsgs);
1750 BUG_ON(*length == 0);
1751 BUG_ON(*length > targetmss);
1753 return 0;
1756 static struct cor_control_retrans *cor_get_next_timeouted_retrans(
1757 struct cor_neighbor *nb_retranslocked)
1759 if (list_empty(&(nb_retranslocked->retrans_fast_list)) == 0) {
1760 struct cor_control_retrans *cr = container_of(
1761 nb_retranslocked->retrans_fast_list.next,
1762 struct cor_control_retrans, timeout_list);
1763 BUG_ON(cr->nb != nb_retranslocked);
1765 if (time_before_eq(cr->timeout, jiffies)) {
1766 return cr;
1770 if (list_empty(&(nb_retranslocked->retrans_slow_list)) == 0) {
1771 struct cor_control_retrans *cr = container_of(
1772 nb_retranslocked->retrans_slow_list.next,
1773 struct cor_control_retrans, timeout_list);
1774 BUG_ON(cr->nb != nb_retranslocked);
1776 if (time_before_eq(cr->timeout, jiffies)) {
1777 return cr;
1781 return 0;
1784 static void cor_add_timeouted_retrans(struct cor_neighbor *nb)
1786 spin_lock_bh(&(nb->retrans_lock));
1788 while (1) {
1789 struct cor_control_retrans *cr =
1790 cor_get_next_timeouted_retrans(nb);
1792 if (cr == 0)
1793 break;
1795 list_del(&(cr->timeout_list));
1796 rb_erase(&(cr->rbn), &(nb->kp_retransmits_rb));
1798 cor_requeue_control_retrans(cr);
1800 kref_put(&(cr->ref), cor_kreffree_bug); /* list_del */
1801 kref_put(&(cr->ref), cor_free_control_retrans); /* rb */
1804 if (list_empty(&(nb->retrans_fast_list)) == 0 ||
1805 list_empty(&(nb->retrans_slow_list)) == 0) {
1806 if (mod_timer(&(nb->retrans_timer),
1807 cor_get_retransmit_timeout(nb)) == 0) {
1808 kref_get(&(nb->ref));
1812 spin_unlock_bh(&(nb->retrans_lock));
1815 static void _cor_delete_all_cmsgs(struct list_head *cmsgs)
1817 while (!list_empty(cmsgs)) {
1818 struct cor_control_msg_out *cm = container_of(cmsgs->next,
1819 struct cor_control_msg_out, lh);
1821 list_del(&(cm->lh));
1823 if (cm->type == MSGTYPE_CONNDATA) {
1824 cor_schedule_retransmit_conn(cm->msg.conn_data.cr, 0,
1826 kfree(cm->msg.conn_data.data_orig);
1829 cor_free_control_msg(cm);
1833 static void cor_delete_all_cmsgs(struct cor_neighbor *nb)
1835 while (1) {
1836 struct list_head cmsgs;
1837 __u32 length = 0;
1839 INIT_LIST_HEAD(&cmsgs);
1841 spin_lock_bh(&(nb->cmsg_lock));
1842 _cor_dequeue_messages(nb, NEIGHBOR_STATE_ACTIVE, 65536, &length,
1843 &cmsgs);
1844 spin_unlock_bh(&(nb->cmsg_lock));
1846 if (list_empty(&cmsgs))
1847 break;
1849 _cor_delete_all_cmsgs(&cmsgs);
1853 static int cor_reset_timeouted_conn_needed(struct cor_neighbor *nb,
1854 struct cor_conn *src_in_l)
1856 if (unlikely(src_in_l->sourcetype != SOURCE_IN ||
1857 src_in_l->source.in.nb != nb ||
1858 src_in_l->isreset != 0))
1859 return 0;
1860 else if (likely(time_after(src_in_l->source.in.jiffies_last_act +
1861 CONN_ACTIVITY_UPDATEINTERVAL_SEC * HZ +
1862 CONN_INACTIVITY_TIMEOUT_SEC * HZ, jiffies)))
1863 return 0;
1865 return 1;
1868 static int cor_reset_timeouted_conn(struct cor_neighbor *nb,
1869 struct cor_conn *src_in)
1871 struct cor_conn_bidir *cnb = cor_get_conn_bidir(src_in);
1872 struct cor_conn *trgt_out = cor_get_conn_reversedir(src_in);
1874 int resetted = 0;
1876 spin_lock_bh(&(cnb->cli.rcv_lock));
1877 spin_lock_bh(&(cnb->srv.rcv_lock));
1879 resetted = cor_reset_timeouted_conn_needed(nb, src_in);
1880 if (unlikely(resetted == 0))
1881 goto unlock;
1883 resetted = (cor_send_reset_conn(nb, cor_get_connid_reverse(
1884 src_in->source.in.conn_id), 1) == 0);
1885 if (unlikely(resetted == 0))
1886 goto unlock;
1889 BUG_ON(trgt_out->isreset != 0);
1890 trgt_out->isreset = 1;
1892 unlock:
1893 spin_unlock_bh(&(cnb->srv.rcv_lock));
1894 spin_unlock_bh(&(cnb->cli.rcv_lock));
1896 if (resetted)
1897 cor_reset_conn(src_in);
1899 return resetted;
1902 static void cor_reset_timeouted_conns(struct cor_neighbor *nb)
1904 int i;
1905 for (i=0;i<10000;i++) {
1906 unsigned long iflags;
1907 struct list_head *lh;
1908 struct cor_conn *src_in;
1910 int resetted = 1;
1912 spin_lock_irqsave(&(nb->conn_list_lock), iflags);
1914 if (list_empty(&(nb->rcv_conn_list))) {
1915 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
1916 break;
1919 lh = nb->rcv_conn_list.next;
1920 list_del(lh);
1921 list_add_tail(lh, &(nb->rcv_conn_list));
1923 src_in = container_of(lh, struct cor_conn, source.in.nb_list);
1924 cor_conn_kref_get(src_in, "stack");
1926 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
1929 spin_lock_bh(&(src_in->rcv_lock));
1930 BUG_ON(src_in->sourcetype != SOURCE_IN);
1931 BUG_ON(src_in->source.in.nb != nb);
1932 resetted = cor_reset_timeouted_conn_needed(nb, src_in);
1933 spin_unlock_bh(&(src_in->rcv_lock));
1934 if (likely(resetted == 0))
1935 goto put;
1937 resetted = cor_reset_timeouted_conn(nb, src_in);
1939 put:
1940 cor_conn_kref_put(src_in, "stack");
1942 if (likely(resetted == 0))
1943 break;
1948 * may not be called by more than one thread at the same time, because
1949 * 1) readding cor_control_msg_out may reorder them
1950 * 2) multiple pings may be sent
1952 int cor_send_messages(struct cor_neighbor *nb, unsigned long cmsg_send_start_j,
1953 ktime_t cmsg_send_start_kt, int *sent)
1955 int rc = QOS_RESUME_DONE;
1956 int ping;
1957 int initsession;
1958 __u32 targetmss = cor_mss_cmsg(nb);
1960 int nbstate = cor_get_neigh_state(nb);
1962 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE))
1963 cor_reset_timeouted_conns(nb);
1965 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
1966 spin_lock_bh(&(nb->retrans_lock));
1967 cor_empty_retrans_queue(nb);
1968 spin_unlock_bh(&(nb->retrans_lock));
1970 cor_delete_all_cmsgs(nb);
1971 return QOS_RESUME_DONE;
1974 ping = cor_time_to_send_ping(nb);
1976 spin_lock_bh(&(nb->cmsg_lock));
1978 if (nb->add_retrans_needed != 0) {
1979 nb->add_retrans_needed = 0;
1980 spin_unlock_bh(&(nb->cmsg_lock));
1981 cor_add_timeouted_retrans(nb);
1982 spin_lock_bh(&(nb->cmsg_lock));
1985 initsession = unlikely(atomic_read(&(nb->sessionid_snd_needed)) != 0);
1987 while (1) {
1988 struct list_head cmsgs;
1989 __u32 length = 0;
1990 __u64 seqno;
1992 INIT_LIST_HEAD(&cmsgs);
1994 if (cor_dequeue_messages(nb, ping, initsession, nbstate,
1995 targetmss, &length, &cmsgs) != 0) {
1996 cor_schedule_controlmsg_timer(nb);
1997 spin_unlock_bh(&(nb->cmsg_lock));
1998 return QOS_RESUME_DONE;
2001 nb->kpacket_seqno++;
2002 seqno = nb->kpacket_seqno;
2004 spin_unlock_bh(&(nb->cmsg_lock));
2006 rc = _cor_send_messages_send(nb, ping, initsession, &cmsgs,
2007 nbstate, length, seqno, cmsg_send_start_j,
2008 cmsg_send_start_kt, sent);
2010 if (rc != QOS_RESUME_DONE)
2011 return rc;
2013 ping = 0;
2014 initsession = 0;
2016 spin_lock_bh(&(nb->cmsg_lock));
2020 static unsigned long cor_calc_cmsg_send_start_j(unsigned long cmsg_timer_timeout)
2022 unsigned long jiffies_tmp = jiffies;
2023 if (unlikely(time_after(cmsg_timer_timeout, jiffies_tmp)))
2024 return jiffies_tmp;
2025 else
2026 return cmsg_timer_timeout;
2029 static ktime_t cor_calc_cmsg_send_start_kt(unsigned long cmsg_timer_timeout)
2031 ktime_t now = ktime_get();
2032 unsigned long jiffies_tmp = jiffies;
2034 unsigned long jiffies_delayed;
2035 if (unlikely(time_after(cmsg_timer_timeout, jiffies_tmp))) {
2036 jiffies_delayed = 0;
2037 } else {
2038 jiffies_delayed = jiffies_tmp - cmsg_timer_timeout;
2039 if (unlikely(jiffies_delayed > HZ/10)) {
2040 jiffies_delayed = HZ/10;
2044 return ns_to_ktime(ktime_to_ns(now) -
2045 1000LL * jiffies_to_usecs(jiffies_delayed));
2049 void cor_controlmsg_timerfunc(struct timer_list *cmsg_timer)
2051 struct cor_neighbor *nb = container_of(cmsg_timer,
2052 struct cor_neighbor, cmsg_timer);
2053 unsigned long cmsg_timer_timeout = (unsigned long)
2054 atomic64_read(&(nb->cmsg_timer_timeout));
2055 unsigned long cmsg_send_start_j = cor_calc_cmsg_send_start_j(
2056 cmsg_timer_timeout);
2057 ktime_t cmsg_send_start_kt = cor_calc_cmsg_send_start_kt(
2058 cmsg_timer_timeout);
2059 cor_qos_enqueue(nb->queue, &(nb->rb_kp), cmsg_send_start_j,
2060 cmsg_send_start_kt, QOS_CALLER_KPACKET, 0);
2061 kref_put(&(nb->ref), cor_neighbor_free);
2064 static int cor_cmsg_full_packet(struct cor_neighbor *nb, int nbstate)
2066 __u32 extralength = 0;
2067 int ping = cor_time_to_send_ping(nb);
2068 int initsession = unlikely(atomic_read(&(nb->sessionid_snd_needed)) !=
2070 __u32 len = cor_get_total_messages_length(nb, ping, initsession,
2071 nbstate, &extralength);
2073 if (len == 0)
2074 return 0;
2075 if (len < cor_mss_cmsg(nb))
2076 return 0;
2078 return 1;
2081 void cor_schedule_controlmsg_timer(struct cor_neighbor *nb_cmsglocked)
2083 unsigned long timeout;
2084 int nbstate = cor_get_neigh_state(nb_cmsglocked);
2086 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED))
2087 goto now;
2089 if (atomic_read(&(nb_cmsglocked->cmsg_bulk_readds)) != 0)
2090 return;
2092 if (cor_cmsg_full_packet(nb_cmsglocked, nbstate))
2093 goto now;
2095 if (nb_cmsglocked->add_retrans_needed != 0)
2096 goto now;
2098 timeout = cor_get_cmsg_timer_timeout(nb_cmsglocked, nbstate);
2100 if (0) {
2101 now:
2102 cor_qos_enqueue(nb_cmsglocked->queue, &(nb_cmsglocked->rb_kp),
2103 jiffies, ktime_get(), QOS_CALLER_KPACKET, 0);
2104 } else if (time_before_eq(timeout, jiffies)) {
2105 unsigned long cmsg_send_start_j = cor_calc_cmsg_send_start_j(
2106 timeout);
2107 ktime_t cmsg_send_start_kt = cor_calc_cmsg_send_start_kt(
2108 timeout);
2109 cor_qos_enqueue(nb_cmsglocked->queue, &(nb_cmsglocked->rb_kp),
2110 cmsg_send_start_j, cmsg_send_start_kt,
2111 QOS_CALLER_KPACKET, 0);
2112 } else {
2113 atomic64_set(&(nb_cmsglocked->cmsg_timer_timeout), timeout);
2114 barrier();
2115 if (mod_timer(&(nb_cmsglocked->cmsg_timer), timeout) == 0) {
2116 kref_get(&(nb_cmsglocked->ref));
2121 static int cor_insert_pending_conn_resets(struct cor_control_msg_out *ins)
2123 struct cor_neighbor *nb = ins->nb;
2124 __u32 conn_id = ins->msg.reset_conn.conn_id;
2126 struct rb_root *root;
2127 struct rb_node **p;
2128 struct rb_node *parent = 0;
2130 BUG_ON(nb == 0);
2131 BUG_ON(ins->msg.reset_conn.in_pending_conn_resets != 0);
2133 root = &(nb->pending_conn_resets_rb);
2134 p = &(root->rb_node);
2136 while ((*p) != 0) {
2137 struct cor_control_msg_out *cm = container_of(*p,
2138 struct cor_control_msg_out,
2139 msg.reset_conn.rbn);
2140 __u32 cm_connid = cm->msg.reset_conn.conn_id;
2142 BUG_ON(cm->nb != ins->nb);
2143 BUG_ON(cm->type != MSGTYPE_RESET_CONN);
2145 parent = *p;
2146 if (conn_id == cm_connid) {
2147 return 1;
2148 } else if (conn_id < cm_connid) {
2149 p = &(*p)->rb_left;
2150 } else if (conn_id > cm_connid) {
2151 p = &(*p)->rb_right;
2152 } else {
2153 BUG();
2157 kref_get(&(ins->ref));
2158 rb_link_node(&(ins->msg.reset_conn.rbn), parent, p);
2159 rb_insert_color(&(ins->msg.reset_conn.rbn), root);
2160 ins->msg.reset_conn.in_pending_conn_resets = 1;
2162 return 0;
2165 static void cor_free_oldest_pong(struct cor_neighbor *nb)
2167 struct cor_control_msg_out *cm = container_of(nb->cmsg_queue_pong.next,
2168 struct cor_control_msg_out, lh);
2170 BUG_ON(list_empty(&(nb->cmsg_queue_pong)));
2171 BUG_ON(unlikely(cm->type != MSGTYPE_PONG));
2173 list_del(&(cm->lh));
2174 nb->cmsg_pongslength -= cm->length;
2175 BUG_ON(nb->cmsg_pongscnt == 0);
2176 cm->nb->cmsg_pongscnt--;
2177 cor_free_control_msg(cm);
2180 static struct list_head * _cor_enqueue_control_msg_getqueue(
2181 struct cor_control_msg_out *cm)
2183 if (cm->type == MSGTYPE_ACK) {
2184 if (cm->msg.ack.fast != 0) {
2185 return &(cm->nb->cmsg_queue_ack_fast);
2186 } else {
2187 return &(cm->nb->cmsg_queue_ack_slow);
2189 } else if (cm->type == MSGTYPE_ACK_CONN) {
2190 if (unlikely(cm->msg.ack_conn.queue == CMSGQUEUE_ACK_CONN_URGENT)) {
2191 return &(cm->nb->cmsg_queue_ackconn_urgent);
2192 } else if (cm->msg.ack_conn.queue == CMSGQUEUE_ACK_CONN_LOWLAT) {
2193 return &(cm->nb->cmsg_queue_ackconn_lowlat);
2194 } else if (cm->msg.ack_conn.queue == CMSGQUEUE_ACK_CONN_HIGHLAT) {
2195 return &(cm->nb->cmsg_queue_ackconn_highlat);
2196 } else {
2197 BUG();
2199 } else if (cm->type == MSGTYPE_CONNDATA) {
2200 if (cm->msg.conn_data.highlatency != 0) {
2201 return &(cm->nb->cmsg_queue_conndata_highlat);
2202 } else {
2203 return &(cm->nb->cmsg_queue_conndata_lowlat);
2205 } else {
2206 return &(cm->nb->cmsg_queue_other);
2210 static int _cor_enqueue_control_msg(struct cor_control_msg_out *cm, int src)
2212 if (unlikely(cm->type == MSGTYPE_PONG)) {
2213 BUG_ON(src == ADDCMSG_SRC_SPLITCONNDATA);
2215 if (cm->nb->cmsg_pongscnt >= MAX_PONG_CMSGS_PER_NEIGH) {
2216 if (src != ADDCMSG_SRC_NEW) {
2217 BUG_ON(cm->nb->cmsg_pongscnt == 0);
2218 cm->nb->cmsg_pongscnt--;
2219 cor_free_control_msg(cm);
2220 return 1;
2221 } else {
2222 cor_free_oldest_pong(cm->nb);
2226 cm->nb->cmsg_pongscnt++;
2227 cm->nb->cmsg_pongslength += cm->length;
2229 if (src != ADDCMSG_SRC_NEW) {
2230 list_add(&(cm->lh), &(cm->nb->cmsg_queue_pong));
2231 } else {
2232 list_add_tail(&(cm->lh), &(cm->nb->cmsg_queue_pong));
2235 return 0;
2236 } else if (unlikely(cm->type == MSGTYPE_RESET_CONN)) {
2237 if (cor_insert_pending_conn_resets(cm) != 0) {
2238 cm->type = 0;
2239 cor_free_control_msg(cm);
2240 return 1;
2244 cm->nb->cmsg_otherlength += cm->length;
2245 if (src == ADDCMSG_SRC_NEW) {
2246 list_add_tail(&(cm->lh), _cor_enqueue_control_msg_getqueue(cm));
2247 } else {
2248 BUG_ON(src == ADDCMSG_SRC_SPLITCONNDATA &&
2249 cm->type != MSGTYPE_CONNDATA);
2250 BUG_ON(src == ADDCMSG_SRC_READD &&
2251 cm->type == MSGTYPE_ACK_CONN);
2253 list_add(&(cm->lh), _cor_enqueue_control_msg_getqueue(cm));
2256 return 0;
2259 static void cor_enqueue_control_msg(struct cor_control_msg_out *cm, int src)
2261 BUG_ON(cm == 0);
2262 BUG_ON(cm->nb == 0);
2264 if (src == ADDCMSG_SRC_NEW)
2265 cm->time_added = jiffies;
2267 spin_lock_bh(&(cm->nb->cmsg_lock));
2269 if (_cor_enqueue_control_msg(cm, src) != 0)
2270 goto out;
2272 if (src != ADDCMSG_SRC_READD && src != ADDCMSG_SRC_RETRANS)
2273 cor_schedule_controlmsg_timer(cm->nb);
2275 out:
2276 spin_unlock_bh(&(cm->nb->cmsg_lock));
2280 void cor_send_pong(struct cor_neighbor *nb, __u32 cookie, ktime_t ping_rcvtime)
2282 struct cor_control_msg_out *cm = _cor_alloc_control_msg(nb);
2284 if (unlikely(cm == 0))
2285 return;
2287 cm->nb = nb;
2288 cm->type = MSGTYPE_PONG;
2289 cm->msg.pong.cookie = cookie;
2290 cm->msg.pong.type = MSGTYPE_PONG_TIMEENQUEUED;
2291 cm->msg.pong.ping_rcvtime = ping_rcvtime;
2292 cm->msg.pong.time_enqueued = ktime_get();
2293 cm->length = 13;
2294 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2297 void cor_send_ack(struct cor_neighbor *nb, __u64 seqno, __u8 fast)
2299 struct cor_control_msg_out *cm = cor_alloc_control_msg(nb,
2300 ACM_PRIORITY_HIGH);
2302 if (unlikely(cm == 0))
2303 return;
2305 cm->nb = nb;
2306 cm->type = MSGTYPE_ACK;
2307 cm->msg.ack.seqno = seqno;
2308 cm->msg.ack.fast = fast;
2309 cm->length = 7;
2310 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2313 static __u8 get_queue_for_ackconn(struct cor_conn *src_in_lx)
2315 if (src_in_lx->is_highlatency != 0) {
2316 if (unlikely(cor_ackconn_urgent(src_in_lx))) {
2317 return CMSGQUEUE_ACK_CONN_LOWLAT;
2318 } else {
2319 return CMSGQUEUE_ACK_CONN_HIGHLAT;
2321 } else {
2322 if (unlikely(cor_ackconn_urgent(src_in_lx))) {
2323 return CMSGQUEUE_ACK_CONN_URGENT;
2324 } else {
2325 return CMSGQUEUE_ACK_CONN_LOWLAT;
2330 static void cor_set_ooolen_flags(struct cor_control_msg_out *cm)
2332 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
2333 (~KP_ACK_CONN_FLAGS_OOO));
2334 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
2335 cor_ooolen_to_flags(cm->msg.ack_conn.length));
2338 /* cmsg_lock must be held */
2339 static void cor_remove_pending_ackconn(struct cor_control_msg_out *cm)
2341 cm->nb->cmsg_otherlength -= cm->length;
2342 list_del(&(cm->lh));
2344 list_del(&(cm->msg.ack_conn.conn_acks));
2345 cor_conn_kref_put(cm->msg.ack_conn.src_in,
2346 "cor_control_msg_out ack_conn");
2347 cm->msg.ack_conn.src_in = 0;
2349 cm->type = 0;
2350 cor_free_control_msg(cm);
2353 /* cmsg_lock must be held */
2354 static void cor_recalc_scheduled_ackconn_size(struct cor_control_msg_out *cm)
2356 cm->nb->cmsg_otherlength -= cm->length;
2357 cm->length = 5 + cor_ack_conn_len(cm->msg.ack_conn.flags);
2358 cm->nb->cmsg_otherlength += cm->length;
2361 /* cmsg_lock must be held */
2362 static int _cor_try_merge_ackconn(struct cor_conn *src_in_l,
2363 struct cor_control_msg_out *fromcm,
2364 struct cor_control_msg_out *tocm, int from_newack)
2366 if (cor_ooolen(fromcm->msg.ack_conn.flags) != 0 &&
2367 cor_ooolen(tocm->msg.ack_conn.flags) != 0) {
2368 __u64 tocmseqno = tocm->msg.ack_conn.seqno_ooo;
2369 __u64 tocmlength = tocm->msg.ack_conn.length;
2370 __u64 fromcmseqno = fromcm->msg.ack_conn.seqno_ooo;
2371 __u64 fromcmlength = fromcm->msg.ack_conn.length;
2373 if (cor_seqno_eq(tocmseqno, fromcmseqno)) {
2374 if (fromcmlength > tocmlength)
2375 tocm->msg.ack_conn.length = fromcmlength;
2376 } else if (cor_seqno_after(fromcmseqno, tocmseqno) &&
2377 cor_seqno_before_eq(fromcmseqno, tocmseqno +
2378 tocmlength)) {
2379 __u64 len = cor_seqno_clean(fromcmseqno + fromcmlength -
2380 tocmseqno);
2381 BUG_ON(len > U32_MAX);
2382 tocm->msg.ack_conn.length = (__u32) len;
2383 } else if (cor_seqno_before(fromcmseqno, tocmseqno) &&
2384 cor_seqno_after_eq(fromcmseqno, tocmseqno)) {
2385 __u64 len = cor_seqno_clean(tocmseqno + tocmlength -
2386 fromcmseqno);
2387 BUG_ON(len > U32_MAX);
2388 tocm->msg.ack_conn.seqno_ooo = fromcmseqno;
2389 tocm->msg.ack_conn.length = (__u32) len;
2390 } else {
2391 return 1;
2393 cor_set_ooolen_flags(tocm);
2396 if ((fromcm->msg.ack_conn.flags &
2397 KP_ACK_CONN_FLAGS_SEQNO) != 0) {
2398 if ((tocm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) == 0)
2399 goto setseqno;
2401 BUG_ON(cor_seqno_eq(fromcm->msg.ack_conn.ack_seqno,
2402 tocm->msg.ack_conn.ack_seqno));
2403 if (cor_seqno_after_eq(tocm->msg.ack_conn.ack_seqno,
2404 fromcm->msg.ack_conn.ack_seqno)) {
2405 BUG_ON(cor_seqno_after(fromcm->msg.ack_conn.seqno,
2406 tocm->msg.ack_conn.seqno));
2407 goto skipseqno;
2410 BUG_ON(cor_seqno_before(fromcm->msg.ack_conn.seqno,
2411 tocm->msg.ack_conn.seqno));
2413 setseqno:
2414 tocm->msg.ack_conn.flags = (tocm->msg.ack_conn.flags |
2415 KP_ACK_CONN_FLAGS_SEQNO);
2416 tocm->msg.ack_conn.seqno = fromcm->msg.ack_conn.seqno;
2417 tocm->msg.ack_conn.ack_seqno = fromcm->msg.ack_conn.ack_seqno;
2419 skipseqno:
2420 if ((fromcm->msg.ack_conn.flags &
2421 KP_ACK_CONN_FLAGS_WINDOW) != 0)
2422 tocm->msg.ack_conn.flags = (tocm->msg.ack_conn.flags |
2423 KP_ACK_CONN_FLAGS_WINDOW);
2427 if (cor_ooolen(fromcm->msg.ack_conn.flags) != 0) {
2428 tocm->msg.ack_conn.seqno_ooo = fromcm->msg.ack_conn.seqno_ooo;
2429 tocm->msg.ack_conn.length = fromcm->msg.ack_conn.length;
2430 cor_set_ooolen_flags(tocm);
2433 if ((fromcm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) != 0) {
2434 BUG_ON((tocm->msg.ack_conn.flags &
2435 KP_ACK_CONN_FLAGS_PRIORITY) != 0);
2436 tocm->msg.ack_conn.priority_seqno =
2437 fromcm->msg.ack_conn.priority_seqno;
2438 tocm->msg.ack_conn.priority = fromcm->msg.ack_conn.priority;
2441 cor_recalc_scheduled_ackconn_size(tocm);
2442 if (from_newack == 0)
2443 cor_remove_pending_ackconn(fromcm);
2445 return 0;
2448 /* cmsg_lock must be held */
2449 static void cor_try_merge_ackconns(struct cor_conn *src_in_l,
2450 struct cor_control_msg_out *cm)
2452 struct list_head *currlh = cm->msg.ack_conn.conn_acks.next;
2454 while (currlh != &(src_in_l->source.in.acks_pending)) {
2455 struct cor_control_msg_out *currcm = container_of(currlh,
2456 struct cor_control_msg_out,
2457 msg.ack_conn.conn_acks);
2458 currlh = currlh->next;
2459 cor_remove_connack_oooflag_ifold(src_in_l, currcm);
2460 _cor_try_merge_ackconn(src_in_l, currcm, cm, 0);
2464 static void cor_merge_or_enqueue_ackconn(struct cor_conn *src_in_l,
2465 struct cor_control_msg_out *cm, int src)
2467 struct list_head *currlh;
2469 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
2471 spin_lock_bh(&(cm->nb->cmsg_lock));
2473 currlh = src_in_l->source.in.acks_pending.next;
2474 while (currlh != &(src_in_l->source.in.acks_pending)) {
2475 struct cor_control_msg_out *currcm = container_of(currlh,
2476 struct cor_control_msg_out,
2477 msg.ack_conn.conn_acks);
2479 BUG_ON(currcm->nb != cm->nb);
2480 BUG_ON(currcm->type != MSGTYPE_ACK_CONN);
2481 BUG_ON(cm->msg.ack_conn.src_in != src_in_l);
2482 BUG_ON(currcm->msg.ack_conn.conn_id !=
2483 cm->msg.ack_conn.conn_id);
2485 if (_cor_try_merge_ackconn(src_in_l, cm, currcm, 1) == 0) {
2486 cor_try_merge_ackconns(src_in_l, currcm);
2487 cor_schedule_controlmsg_timer(currcm->nb);
2488 spin_unlock_bh(&(currcm->nb->cmsg_lock));
2490 * flags:
2491 * when calling cor_free_control_msg here conn may
2492 * already be locked and priority_send_allowed and
2493 * priority_send_allowed should not be reset
2495 cm->msg.ack_conn.flags = 0;
2496 cor_free_control_msg(cm);
2497 return;
2500 currlh = currlh->next;
2503 list_add_tail(&(cm->msg.ack_conn.conn_acks),
2504 &(src_in_l->source.in.acks_pending));
2506 spin_unlock_bh(&(cm->nb->cmsg_lock));
2508 cor_enqueue_control_msg(cm, src);
2511 static int cor_try_update_ackconn_seqno(struct cor_conn *src_in_l)
2513 int rc = 1;
2515 spin_lock_bh(&(src_in_l->source.in.nb->cmsg_lock));
2517 if (list_empty(&(src_in_l->source.in.acks_pending)) == 0) {
2518 struct cor_control_msg_out *cm = container_of(
2519 src_in_l->source.in.acks_pending.next,
2520 struct cor_control_msg_out,
2521 msg.ack_conn.conn_acks);
2522 BUG_ON(cm->nb != src_in_l->source.in.nb);
2523 BUG_ON(cm->type != MSGTYPE_ACK_CONN);
2524 BUG_ON(cm->msg.ack_conn.src_in != src_in_l);
2525 BUG_ON(cm->msg.ack_conn.conn_id != cor_get_connid_reverse(
2526 src_in_l->source.in.conn_id));
2528 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
2529 KP_ACK_CONN_FLAGS_SEQNO |
2530 KP_ACK_CONN_FLAGS_WINDOW);
2531 cm->msg.ack_conn.seqno = src_in_l->source.in.next_seqno;
2533 src_in_l->source.in.ack_seqno++;
2534 cm->msg.ack_conn.ack_seqno = src_in_l->source.in.ack_seqno;
2536 cor_remove_connack_oooflag_ifold(src_in_l, cm);
2537 cor_recalc_scheduled_ackconn_size(cm);
2539 cor_try_merge_ackconns(src_in_l, cm);
2541 rc = 0;
2544 spin_unlock_bh(&(src_in_l->source.in.nb->cmsg_lock));
2546 return rc;
2549 void cor_send_ack_conn_ifneeded(struct cor_conn *src_in_l, __u64 seqno_ooo,
2550 __u32 ooo_length)
2552 struct cor_control_msg_out *cm;
2554 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
2556 BUG_ON(ooo_length > 0 && cor_seqno_before_eq(seqno_ooo,
2557 src_in_l->source.in.next_seqno));
2559 cor_update_windowlimit(src_in_l);
2561 if (ooo_length != 0) {
2562 cm = cor_alloc_control_msg(src_in_l->source.in.nb,
2563 ACM_PRIORITY_LOW);
2564 if (cm != 0)
2565 goto add;
2568 if (src_in_l->source.in.inorder_ack_needed != 0)
2569 goto ack_needed;
2571 if (cor_seqno_clean(src_in_l->source.in.window_seqnolimit -
2572 src_in_l->source.in.next_seqno) < WINDOW_ENCODE_MIN)
2573 return;
2575 if (cor_seqno_clean(src_in_l->source.in.window_seqnolimit_remote -
2576 src_in_l->source.in.next_seqno) >= WINDOW_ENCODE_MIN &&
2577 cor_seqno_clean(src_in_l->source.in.window_seqnolimit -
2578 src_in_l->source.in.next_seqno) * 7 <
2579 cor_seqno_clean(
2580 src_in_l->source.in.window_seqnolimit_remote -
2581 src_in_l->source.in.next_seqno) * 8)
2582 return;
2584 ack_needed:
2585 if (cor_try_update_ackconn_seqno(src_in_l) == 0)
2586 goto out;
2588 cm = cor_alloc_control_msg(src_in_l->source.in.nb, ACM_PRIORITY_MED);
2589 if (cm == 0) {
2590 printk(KERN_ERR "error allocating inorder ack");
2591 return;
2594 add:
2595 cm->type = MSGTYPE_ACK_CONN;
2596 src_in_l->source.in.ack_seqno++;
2597 cm->msg.ack_conn.ack_seqno = src_in_l->source.in.ack_seqno;
2598 cor_conn_kref_get(src_in_l, "cor_control_msg_out ack_conn");
2599 cm->msg.ack_conn.src_in = src_in_l;
2600 cm->msg.ack_conn.conn_id =
2601 cor_get_connid_reverse(src_in_l->source.in.conn_id);
2602 cm->msg.ack_conn.seqno = src_in_l->source.in.next_seqno;
2603 cm->msg.ack_conn.seqno_ooo = seqno_ooo;
2604 cm->msg.ack_conn.length = ooo_length;
2605 cm->msg.ack_conn.bufsize_changerate =
2606 _cor_bufsize_update_get_changerate(src_in_l);
2607 cm->msg.ack_conn.flags = KP_ACK_CONN_FLAGS_SEQNO |
2608 KP_ACK_CONN_FLAGS_WINDOW;
2609 cor_set_ooolen_flags(cm);
2610 cm->msg.ack_conn.is_highlatency = src_in_l->is_highlatency;
2611 cm->msg.ack_conn.queue = get_queue_for_ackconn(src_in_l);
2612 cm->length = 5 + cor_ack_conn_len(cm->msg.ack_conn.flags);
2614 cor_merge_or_enqueue_ackconn(src_in_l, cm, ADDCMSG_SRC_NEW);
2616 out:
2617 src_in_l->source.in.inorder_ack_needed = 0;
2618 src_in_l->source.in.window_seqnolimit_remote =
2619 src_in_l->source.in.window_seqnolimit;
2622 static int cor_try_add_priority(struct cor_conn *trgt_out_ll, __u16 priority)
2624 int rc = 1;
2625 struct cor_conn *src_in_ll = cor_get_conn_reversedir(trgt_out_ll);
2627 spin_lock_bh(&(trgt_out_ll->target.out.nb->cmsg_lock));
2629 if (list_empty(&(src_in_ll->source.in.acks_pending)) == 0) {
2630 struct cor_control_msg_out *cm = container_of(
2631 src_in_ll->source.in.acks_pending.next,
2632 struct cor_control_msg_out,
2633 msg.ack_conn.conn_acks);
2634 BUG_ON(cm->nb != trgt_out_ll->target.out.nb);
2635 BUG_ON(cm->type != MSGTYPE_ACK_CONN);
2636 BUG_ON(cm->msg.ack_conn.src_in != src_in_ll);
2637 BUG_ON(cm->msg.ack_conn.conn_id !=
2638 trgt_out_ll->target.out.conn_id);
2640 BUG_ON((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) !=
2642 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
2643 KP_ACK_CONN_FLAGS_PRIORITY);
2644 cm->msg.ack_conn.priority_seqno =
2645 trgt_out_ll->target.out.priority_seqno;
2646 cm->msg.ack_conn.priority = priority;
2647 cor_recalc_scheduled_ackconn_size(cm);
2649 rc = 0;
2652 spin_unlock_bh(&(trgt_out_ll->target.out.nb->cmsg_lock));
2654 return rc;
2657 void cor_send_priority(struct cor_conn *trgt_out_ll, __u16 priority)
2659 struct cor_conn *src_in_ll = cor_get_conn_reversedir(trgt_out_ll);
2660 struct cor_control_msg_out *cm;
2662 if (cor_try_add_priority(trgt_out_ll, priority) == 0)
2663 goto out;
2665 cm = cor_alloc_control_msg(trgt_out_ll->target.out.nb,
2666 ACM_PRIORITY_LOW);
2667 if (cm == 0)
2668 return;
2670 cm->type = MSGTYPE_ACK_CONN;
2671 cm->msg.ack_conn.flags = KP_ACK_CONN_FLAGS_PRIORITY;
2672 cor_conn_kref_get(src_in_ll, "cor_control_msg_out ack_conn");
2673 BUG_ON(trgt_out_ll->targettype != TARGET_OUT);
2674 cm->msg.ack_conn.src_in = src_in_ll;
2675 cm->msg.ack_conn.conn_id = trgt_out_ll->target.out.conn_id;
2676 cm->msg.ack_conn.bufsize_changerate =
2677 _cor_bufsize_update_get_changerate(src_in_ll);
2678 cm->msg.ack_conn.priority_seqno =
2679 trgt_out_ll->target.out.priority_seqno;
2680 cm->msg.ack_conn.priority = priority;
2681 cm->msg.ack_conn.is_highlatency = trgt_out_ll->is_highlatency;
2682 cm->msg.ack_conn.queue = get_queue_for_ackconn(src_in_ll);
2684 cm->length = 5 + cor_ack_conn_len(cm->msg.ack_conn.flags);
2685 cor_merge_or_enqueue_ackconn(src_in_ll, cm, ADDCMSG_SRC_NEW);
2687 out:
2688 trgt_out_ll->target.out.priority_last = priority;
2689 trgt_out_ll->target.out.priority_seqno =
2690 (trgt_out_ll->target.out.priority_seqno + 1) & 15;
2691 trgt_out_ll->target.out.priority_send_allowed = 0;
2694 void cor_free_ack_conns(struct cor_conn *src_in_lx)
2696 int changed = 0;
2697 spin_lock_bh(&(src_in_lx->source.in.nb->cmsg_lock));
2698 while (list_empty(&(src_in_lx->source.in.acks_pending)) == 0) {
2699 struct list_head *currlh =
2700 src_in_lx->source.in.acks_pending.next;
2701 struct cor_control_msg_out *currcm = container_of(currlh,
2702 struct cor_control_msg_out,
2703 msg.ack_conn.conn_acks);
2705 cor_remove_pending_ackconn(currcm);
2706 changed = 1;
2708 if (changed)
2709 cor_schedule_controlmsg_timer(src_in_lx->source.in.nb);
2710 spin_unlock_bh(&(src_in_lx->source.in.nb->cmsg_lock));
2713 void cor_send_connect_success(struct cor_control_msg_out *cm, __u32 conn_id,
2714 struct cor_conn *src_in)
2716 cm->type = MSGTYPE_CONNECT_SUCCESS;
2717 cm->msg.connect_success.conn_id = conn_id;
2718 cor_conn_kref_get(src_in, "cor_control_msg_out connect_success");
2719 cm->msg.connect_success.src_in = src_in;
2720 cm->length = 7;
2721 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2724 void cor_send_connect_nb(struct cor_control_msg_out *cm, __u32 conn_id,
2725 __u64 seqno1, __u64 seqno2, struct cor_conn *src_in_ll)
2727 cm->type = MSGTYPE_CONNECT;
2728 cm->msg.connect.conn_id = conn_id;
2729 cm->msg.connect.seqno1 = seqno1;
2730 cm->msg.connect.seqno2 = seqno2;
2731 cor_conn_kref_get(src_in_ll, "cor_control_msg_out connect");
2732 BUG_ON(src_in_ll->sourcetype != SOURCE_IN);
2733 cm->msg.connect.src_in = src_in_ll;
2734 cm->length = 22;
2735 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2738 void cor_send_conndata(struct cor_control_msg_out *cm, __u32 conn_id,
2739 __u64 seqno, char *data_orig, char *data, __u32 datalen,
2740 __u8 windowused, __u8 flush, __u8 highlatency,
2741 struct cor_conn_retrans *cr)
2743 cm->type = MSGTYPE_CONNDATA;
2744 cm->msg.conn_data.conn_id = conn_id;
2745 cm->msg.conn_data.seqno = seqno;
2746 cm->msg.conn_data.data_orig = data_orig;
2747 cm->msg.conn_data.data = data;
2748 cm->msg.conn_data.datalen = datalen;
2749 cm->msg.conn_data.windowused = windowused;
2750 cm->msg.conn_data.flush = flush;
2751 cm->msg.conn_data.highlatency = highlatency;
2752 cm->msg.conn_data.cr = cr;
2753 kref_get(&(cr->ref));
2754 cm->length = get_kp_conn_data_length(datalen);
2755 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2758 int cor_send_reset_conn(struct cor_neighbor *nb, __u32 conn_id, int lowprio)
2760 struct cor_control_msg_out *cm;
2762 if (unlikely(cor_get_neigh_state(nb) == NEIGHBOR_STATE_KILLED))
2763 return 0;
2765 cm = cor_alloc_control_msg(nb, lowprio ?
2766 ACM_PRIORITY_LOW : ACM_PRIORITY_MED);
2768 if (unlikely(cm == 0))
2769 return 1;
2771 cm->type = MSGTYPE_RESET_CONN;
2772 cm->msg.reset_conn.conn_id = conn_id;
2773 cm->length = 5;
2775 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2777 return 0;
2780 int __init cor_kgen_init(void)
2782 cor_controlmsg_slab = kmem_cache_create("cor_controlmsg",
2783 sizeof(struct cor_control_msg_out), 8, 0, 0);
2784 if (unlikely(cor_controlmsg_slab == 0))
2785 return -ENOMEM;
2787 cor_controlretrans_slab = kmem_cache_create("cor_controlretransmsg",
2788 sizeof(struct cor_control_retrans), 8, 0, 0);
2789 if (unlikely(cor_controlretrans_slab == 0))
2790 return -ENOMEM;
2792 return 0;
2795 void __exit cor_kgen_exit2(void)
2797 kmem_cache_destroy(cor_controlretrans_slab);
2798 cor_controlretrans_slab = 0;
2800 kmem_cache_destroy(cor_controlmsg_slab);
2801 cor_controlmsg_slab = 0;
2804 MODULE_LICENSE("GPL");