send windowused instead of senddelayed flag
[cor.git] / net / cor / neigh_snd.c
blobe1d8d82895796cc1a683fd6856f4a0c7670b6308
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <asm/byteorder.h>
23 #include "cor.h"
25 /* not sent over the network - internal meaning only */
26 #define MSGTYPE_PONG 1
27 #define MSGTYPE_ACK 2
28 #define MSGTYPE_ACK_CONN 3
29 #define MSGTYPE_CONNECT 4
30 #define MSGTYPE_CONNECT_SUCCESS 5
31 #define MSGTYPE_RESET_CONN 6
32 #define MSGTYPE_CONNDATA 7
33 #define MSGTYPE_SET_MAX_CMSG_DELAY 8
35 #define MSGTYPE_PONG_TIMEENQUEUED 1
36 #define MSGTYPE_PONG_RESPDELAY 2
38 struct cor_control_msg_out{
39 __u8 type;
40 __u32 length;
41 struct kref ref;
42 struct cor_neighbor *nb;
44 /* either queue or control_retrans_packet */
45 struct list_head lh;
47 unsigned long time_added;
49 union{
50 struct{
51 __u32 cookie;
52 __u8 type;
54 ktime_t ping_rcvtime;
55 ktime_t time_enqueued;
56 }pong;
58 struct{
59 __u64 seqno;
60 __u8 fast;
61 }ack;
63 struct{
64 struct cor_conn *src_in;
65 struct list_head conn_acks;
66 __u32 conn_id;
67 __u64 seqno;
68 __u64 seqno_ooo;
69 __u32 length;
71 __u8 flags;
73 __u8 bufsize_changerate;
75 __u16 priority;
76 __u8 priority_seqno;
78 __u8 is_highlatency;
79 __u8 queue;
81 __u32 ack_seqno;
82 }ack_conn;
84 struct{
85 __u32 conn_id;
86 __u64 seqno1;
87 __u64 seqno2;
88 struct cor_conn *src_in;
89 }connect;
91 struct{
92 __u32 conn_id;
93 struct cor_conn *src_in;
94 }connect_success;
96 struct{
97 struct rb_node rbn;
98 __u8 in_pending_conn_resets;
99 __u32 conn_id;
100 }reset_conn;
102 struct{
103 __u32 conn_id;
104 __u64 seqno;
105 __u32 datalen;
106 __u8 windowused;
107 __u8 flush;
108 __u8 highlatency;
109 char *data_orig;
110 char *data;
111 struct cor_conn_retrans *cr;
112 }conn_data;
114 struct{
115 __u32 ack_fast_delay;
116 __u32 ack_slow_delay;
117 __u32 ackconn_lowlatency_delay;
118 __u32 ackconn_highlatency_delay;
119 __u32 pong_delay;
120 }set_max_cmsg_delay;
121 }msg;
124 struct cor_control_retrans {
125 struct kref ref;
127 struct cor_neighbor *nb;
128 __u64 seqno;
130 unsigned long timeout;
132 struct list_head msgs;
134 struct rb_node rbn;
135 struct list_head timeout_list;
139 static struct kmem_cache *cor_controlmsg_slab;
140 static struct kmem_cache *cor_controlretrans_slab;
142 static atomic_t cor_cmsg_othercnt = ATOMIC_INIT(0);
144 #define ADDCMSG_SRC_NEW 1
145 #define ADDCMSG_SRC_SPLITCONNDATA 2
146 #define ADDCMSG_SRC_READD 3
147 #define ADDCMSG_SRC_RETRANS 4
149 static void cor_enqueue_control_msg(struct cor_control_msg_out *msg, int src);
151 static void cor_try_merge_ackconns(struct cor_conn *src_in_l,
152 struct cor_control_msg_out *cm);
154 static void cor_merge_or_enqueue_ackconn(struct cor_conn *src_in_l,
155 struct cor_control_msg_out *cm, int src);
157 static struct cor_control_msg_out *_cor_alloc_control_msg(
158 struct cor_neighbor *nb)
160 struct cor_control_msg_out *cm;
162 BUG_ON(nb == 0);
164 cm = kmem_cache_alloc(cor_controlmsg_slab, GFP_ATOMIC);
165 if (unlikely(cm == 0))
166 return 0;
167 memset(cm, 0, sizeof(struct cor_control_msg_out));
168 kref_init(&(cm->ref));
169 cm->nb = nb;
170 return cm;
173 static int cor_calc_limit(int limit, int priority)
175 if (priority == ACM_PRIORITY_LOW)
176 return (limit+1)/2;
177 else if (priority == ACM_PRIORITY_MED)
178 return (limit * 3 + 1)/4;
179 else if (priority == ACM_PRIORITY_HIGH)
180 return limit;
181 else
182 BUG();
185 struct cor_control_msg_out *cor_alloc_control_msg(struct cor_neighbor *nb,
186 int priority)
188 struct cor_control_msg_out *cm = 0;
190 long packets1;
191 long packets2;
193 BUG_ON(nb == 0);
195 packets1 = atomic_inc_return(&(nb->cmsg_othercnt));
196 packets2 = atomic_inc_return(&(cor_cmsg_othercnt));
198 BUG_ON(packets1 <= 0);
199 BUG_ON(packets2 <= 0);
201 if (packets1 <= cor_calc_limit(GUARANTEED_CMSGS_PER_NEIGH, priority))
202 goto alloc;
204 if (unlikely(unlikely(packets1 > cor_calc_limit(MAX_CMSGS_PER_NEIGH,
205 priority)) ||
206 unlikely(packets2 > cor_calc_limit(MAX_CMSGS,
207 priority))))
208 goto full;
210 alloc:
211 cm = _cor_alloc_control_msg(nb);
212 if (unlikely(cm == 0)) {
213 full:
215 /* printk(KERN_ERR "cor_alloc_control_msg failed %ld %ld", packets1, packets2); */
216 atomic_dec(&(nb->cmsg_othercnt));
217 atomic_dec(&(cor_cmsg_othercnt));
219 return cm;
222 static void cor_cmsg_kref_free(struct kref *ref)
224 struct cor_control_msg_out *cm = container_of(ref,
225 struct cor_control_msg_out, ref);
226 kmem_cache_free(cor_controlmsg_slab, cm);
229 void cor_free_control_msg(struct cor_control_msg_out *cm)
231 if (likely(cm->type != MSGTYPE_PONG)) {
232 atomic_dec(&(cm->nb->cmsg_othercnt));
233 atomic_dec(&(cor_cmsg_othercnt));
236 if (cm->type == MSGTYPE_ACK_CONN) {
237 BUG_ON(cm->msg.ack_conn.src_in == 0);
238 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) != 0){
239 struct cor_conn *trgt_out = cor_get_conn_reversedir(
240 cm->msg.ack_conn.src_in);
241 spin_lock_bh(&(trgt_out->rcv_lock));
242 BUG_ON(trgt_out->targettype != TARGET_OUT);
243 if (trgt_out->target.out.priority_send_allowed != 0) {
244 trgt_out->target.out.priority_send_allowed = 1;
245 spin_unlock_bh(&(trgt_out->rcv_lock));
246 cor_conn_refresh_priority(trgt_out, 0);
247 } else {
248 spin_unlock_bh(&(trgt_out->rcv_lock));
251 cor_conn_kref_put(cm->msg.ack_conn.src_in,
252 "cor_control_msg_out ack_conn");
253 cm->msg.ack_conn.src_in = 0;
254 } else if (cm->type == MSGTYPE_CONNECT) {
255 BUG_ON(cm->msg.connect.src_in == 0);
256 cor_conn_kref_put(cm->msg.connect.src_in,
257 "cor_control_msg_out connect");
258 cm->msg.connect.src_in = 0;
259 } else if (cm->type == MSGTYPE_CONNECT_SUCCESS) {
260 BUG_ON(cm->msg.connect_success.src_in == 0);
261 cor_conn_kref_put(cm->msg.connect_success.src_in,
262 "cor_control_msg_out connect_success");
263 cm->msg.connect_success.src_in = 0;
264 } else if (cm->type == MSGTYPE_RESET_CONN) {
265 spin_lock_bh(&(cm->nb->cmsg_lock));
266 if (cm->msg.reset_conn.in_pending_conn_resets != 0) {
267 rb_erase(&(cm->msg.reset_conn.rbn),
268 &(cm->nb->pending_conn_resets_rb));
269 cm->msg.reset_conn.in_pending_conn_resets = 0;
271 kref_put(&(cm->ref), cor_kreffree_bug);
273 spin_unlock_bh(&(cm->nb->cmsg_lock));
276 kref_put(&(cm->ref), cor_cmsg_kref_free);
279 static void cor_free_control_retrans(struct kref *ref)
281 struct cor_control_retrans *cr = container_of(ref,
282 struct cor_control_retrans, ref);
284 while (list_empty(&(cr->msgs)) == 0) {
285 struct cor_control_msg_out *cm = container_of(cr->msgs.next,
286 struct cor_control_msg_out, lh);
288 if (cm->type == MSGTYPE_PONG)
289 atomic_dec(&(cm->nb->cmsg_pongs_retrans_cnt));
291 list_del(&(cm->lh));
292 cor_free_control_msg(cm);
295 kmem_cache_free(cor_controlretrans_slab, cr);
298 struct cor_control_retrans *cor_get_control_retrans(
299 struct cor_neighbor *nb_retranslocked, __u64 seqno)
301 struct rb_node *n = 0;
302 struct cor_control_retrans *ret = 0;
304 n = nb_retranslocked->kp_retransmits_rb.rb_node;
306 while (likely(n != 0) && ret == 0) {
307 struct cor_control_retrans *cr = container_of(n,
308 struct cor_control_retrans, rbn);
310 BUG_ON(cr->nb != nb_retranslocked);
312 if (cor_seqno_before(seqno, cr->seqno))
313 n = n->rb_left;
314 else if (cor_seqno_after(seqno, cr->seqno))
315 n = n->rb_right;
316 else
317 ret = cr;
320 if (ret != 0)
321 kref_get(&(ret->ref));
323 return ret;
326 /* nb->retrans_lock must be held */
327 void cor_insert_control_retrans(struct cor_control_retrans *ins)
329 struct cor_neighbor *nb = ins->nb;
330 __u64 seqno = ins->seqno;
332 struct rb_root *root;
333 struct rb_node **p;
334 struct rb_node *parent = 0;
336 BUG_ON(nb == 0);
338 root = &(nb->kp_retransmits_rb);
339 p = &(root->rb_node);
341 while ((*p) != 0) {
342 struct cor_control_retrans *cr = container_of(*p,
343 struct cor_control_retrans, rbn);
345 BUG_ON(cr->nb != nb);
347 parent = *p;
348 if (unlikely(cor_seqno_eq(seqno, cr->seqno))) {
349 BUG();
350 } else if (cor_seqno_before(seqno, cr->seqno)) {
351 p = &(*p)->rb_left;
352 } else if (cor_seqno_after(seqno, cr->seqno)) {
353 p = &(*p)->rb_right;
354 } else {
355 BUG();
359 kref_get(&(ins->ref));
360 rb_link_node(&(ins->rbn), parent, p);
361 rb_insert_color(&(ins->rbn), root);
364 static void cor_remove_connack_oooflag_ifold(struct cor_conn *src_in_l,
365 struct cor_control_msg_out *cm)
367 if (cor_ooolen(cm->msg.ack_conn.flags) != 0 && cor_seqno_before_eq(
368 cm->msg.ack_conn.seqno_ooo +
369 cm->msg.ack_conn.length,
370 src_in_l->source.in.next_seqno)) {
371 cm->msg.ack_conn.length = 0;
372 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
373 (~KP_ACK_CONN_FLAGS_OOO));
377 static int cor_ackconn_prepare_requeue(struct cor_conn *cn_l,
378 struct cor_control_msg_out *cm)
380 if (unlikely(unlikely(cn_l->sourcetype != SOURCE_IN) ||
381 unlikely(cn_l->source.in.nb != cm->nb) ||
382 unlikely(
383 cor_get_connid_reverse(cn_l->source.in.conn_id) !=
384 cm->msg.ack_conn.conn_id) ||
385 unlikely(cn_l->isreset != 0)))
386 return 0;
388 cor_remove_connack_oooflag_ifold(cn_l, cm);
390 if (!cor_seqno_eq(cm->msg.ack_conn.ack_seqno,
391 cn_l->source.in.ack_seqno))
392 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
393 (~KP_ACK_CONN_FLAGS_SEQNO) &
394 (~KP_ACK_CONN_FLAGS_WINDOW));
396 if (cm->msg.ack_conn.flags == 0)
397 return 0;
399 cm->length = 5 + cor_ack_conn_len(cm->msg.ack_conn.flags);
401 return 1;
404 static void cor_requeue_control_retrans(struct cor_control_retrans *cr)
406 atomic_inc(&(cr->nb->cmsg_bulk_readds));
408 while (list_empty(&(cr->msgs)) == 0) {
409 struct cor_control_msg_out *cm = container_of(cr->msgs.prev,
410 struct cor_control_msg_out, lh);
411 list_del(&(cm->lh));
413 BUG_ON(cm->nb != cr->nb);
415 if (cm->type == MSGTYPE_ACK_CONN) {
416 struct cor_conn *cn_l = cm->msg.ack_conn.src_in;
417 spin_lock_bh(&(cn_l->rcv_lock));
418 if (unlikely(cor_ackconn_prepare_requeue(cn_l,
419 cm) == 0)) {
420 cor_free_control_msg(cm);
421 } else {
422 cor_merge_or_enqueue_ackconn(cn_l, cm,
423 ADDCMSG_SRC_RETRANS);
426 spin_unlock_bh(&(cn_l->rcv_lock));
427 } else {
428 if (cm->type == MSGTYPE_PONG)
429 atomic_dec(&(cm->nb->cmsg_pongs_retrans_cnt));
430 cor_enqueue_control_msg(cm, ADDCMSG_SRC_RETRANS);
434 atomic_dec(&(cr->nb->cmsg_bulk_readds));
436 spin_lock_bh(&(cr->nb->cmsg_lock));
437 cor_schedule_controlmsg_timer(cr->nb);
438 spin_unlock_bh(&(cr->nb->cmsg_lock));
441 static void _cor_empty_retrans_queue(struct cor_neighbor *nb_retranslocked,
442 struct list_head *retrans_list)
444 while (!list_empty(retrans_list)) {
445 struct cor_control_retrans *cr = container_of(
446 retrans_list->next, struct cor_control_retrans,
447 timeout_list);
449 BUG_ON(cr->nb != nb_retranslocked);
451 list_del(&(cr->timeout_list));
452 rb_erase(&(cr->rbn), &(nb_retranslocked->kp_retransmits_rb));
454 kref_put(&(cr->ref), cor_kreffree_bug); /* rb */
455 kref_put(&(cr->ref), cor_free_control_retrans); /* list */
459 static void cor_empty_retrans_queue(struct cor_neighbor *nb_retranslocked)
461 _cor_empty_retrans_queue(nb_retranslocked,
462 &(nb_retranslocked->retrans_fast_list));
463 _cor_empty_retrans_queue(nb_retranslocked,
464 &(nb_retranslocked->retrans_slow_list));
467 static unsigned long cor_get_retransmit_timeout(
468 struct cor_neighbor *nb_retranslocked)
470 struct cor_control_retrans *cr1 = 0;
471 struct cor_control_retrans *cr2 = 0;
472 struct cor_control_retrans *cr = 0;
474 if (list_empty(&(nb_retranslocked->retrans_fast_list)) == 0) {
475 cr1 = container_of(nb_retranslocked->retrans_fast_list.next,
476 struct cor_control_retrans, timeout_list);
477 BUG_ON(cr1->nb != nb_retranslocked);
480 if (list_empty(&(nb_retranslocked->retrans_slow_list)) == 0) {
481 cr2 = container_of(nb_retranslocked->retrans_slow_list.next,
482 struct cor_control_retrans, timeout_list);
483 BUG_ON(cr2->nb != nb_retranslocked);
486 if (cr1 == 0)
487 cr = cr2;
488 else if (cr2 == 0)
489 cr = cr1;
490 else
491 cr = (time_after(cr1->timeout, cr2->timeout) ? cr2 : cr1);
493 BUG_ON(cr == 0);
495 return cr->timeout;
498 void cor_retransmit_timerfunc(struct timer_list *retrans_timer)
500 struct cor_neighbor *nb = container_of(retrans_timer,
501 struct cor_neighbor, retrans_timer);
502 int nbstate = cor_get_neigh_state(nb);
503 unsigned long timeout;
505 spin_lock_bh(&(nb->retrans_lock));
507 if (list_empty(&(nb->retrans_fast_list)) &&
508 list_empty(&(nb->retrans_slow_list))) {
509 spin_unlock_bh(&(nb->retrans_lock));
510 kref_put(&(nb->ref), cor_neighbor_free);
511 return;
514 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
515 cor_empty_retrans_queue(nb);
516 spin_unlock_bh(&(nb->retrans_lock));
517 kref_put(&(nb->ref), cor_neighbor_free);
518 return;
521 timeout = cor_get_retransmit_timeout(nb);
523 if (time_after(timeout, jiffies)) {
524 int rc = mod_timer(&(nb->retrans_timer), timeout);
525 spin_unlock_bh(&(nb->retrans_lock));
526 if (rc != 0)
527 kref_put(&(nb->ref), cor_neighbor_free);
528 return;
531 spin_unlock_bh(&(nb->retrans_lock));
533 spin_lock_bh(&(nb->cmsg_lock));
534 nb->add_retrans_needed = 1;
535 cor_schedule_controlmsg_timer(nb);
536 spin_unlock_bh(&(nb->cmsg_lock));
538 kref_put(&(nb->ref), cor_neighbor_free);
541 static void cor_schedule_retransmit(struct cor_control_retrans *cr,
542 struct cor_neighbor *nb, int fastack)
544 int first;
546 cr->timeout = cor_calc_timeout(atomic_read(&(nb->latency_retrans_us)),
547 atomic_read(&(nb->latency_stddev_retrans_us)),
548 fastack ?
549 atomic_read(&(nb->max_remote_ack_fast_delay_us)) :
550 atomic_read(&(nb->max_remote_ack_slow_delay_us)));
552 spin_lock_bh(&(nb->retrans_lock));
554 cor_insert_control_retrans(cr);
555 if (fastack) {
556 first = list_empty(&(nb->retrans_fast_list));
557 list_add_tail(&(cr->timeout_list), &(nb->retrans_fast_list));
558 } else {
559 first = list_empty(&(nb->retrans_slow_list));
560 list_add_tail(&(cr->timeout_list), &(nb->retrans_slow_list));
563 if (first) {
564 if (mod_timer(&(nb->retrans_timer),
565 cor_get_retransmit_timeout(nb)) == 0) {
566 kref_get(&(nb->ref));
570 spin_unlock_bh(&(nb->retrans_lock));
573 void cor_kern_ack_rcvd(struct cor_neighbor *nb, __u64 seqno)
575 struct cor_control_retrans *cr = 0;
577 spin_lock_bh(&(nb->retrans_lock));
579 cr = cor_get_control_retrans(nb, seqno);
581 if (cr == 0) {
582 /* char *seqno_p = (char *) &seqno;
583 seqno = cpu_to_be32(seqno);
584 printk(KERN_ERR "bogus/duplicate ack received %d %d %d %d",
585 seqno_p[0], seqno_p[1], seqno_p[2], seqno_p[3]);
587 goto out;
590 rb_erase(&(cr->rbn), &(nb->kp_retransmits_rb));
592 BUG_ON(cr->nb != nb);
594 list_del(&(cr->timeout_list));
596 out:
597 spin_unlock_bh(&(nb->retrans_lock));
599 if (cr != 0) {
600 /* cor_get_control_retrans */
601 kref_put(&(cr->ref), cor_kreffree_bug);
603 kref_put(&(cr->ref), cor_kreffree_bug); /* rb_erase */
604 kref_put(&(cr->ref), cor_free_control_retrans); /* list */
608 static __u8 cor_get_window(struct cor_conn *cn,
609 struct cor_neighbor *expectedsender, __u32 expected_connid)
611 __u8 window = 0;
613 BUG_ON(expectedsender == 0);
615 spin_lock_bh(&(cn->rcv_lock));
617 if (cor_is_conn_in(cn, expectedsender, expected_connid) == 0)
618 goto out;
620 window = cor_enc_log_64_7(cor_seqno_clean(
621 cn->source.in.window_seqnolimit -
622 cn->source.in.next_seqno));
624 cn->source.in.window_seqnolimit_remote = cn->source.in.next_seqno +
625 cor_dec_log_64_7(window);
627 out:
628 spin_unlock_bh(&(cn->rcv_lock));
630 return window;
633 /* static void padding(struct sk_buff *skb, __u32 length)
635 char *dst;
636 if (length <= 0)
637 return;
638 dst = skb_put(skb, length);
639 BUG_ON(dst == 0);
640 memset(dst, KP_PADDING, length);
641 } */
644 static __u32 cor_add_init_session(struct sk_buff *skb, __be32 sessionid,
645 __u32 spaceleft)
647 char *dst;
649 BUG_ON(KP_MISC_INIT_SESSION_CMDLEN != 5);
651 if (unlikely(spaceleft < 5))
652 return 0;
654 dst = skb_put(skb, 5);
655 BUG_ON(dst == 0);
657 dst[0] = get_kp_code(KP_MISC, KP_MISC_INIT_SESSION);
658 cor_put_be32(dst + 1, sessionid);
660 return 5;
663 static __u32 cor_add_ack(struct sk_buff *skb, struct cor_control_retrans *cr,
664 struct cor_control_msg_out *cm, __u32 spaceleft)
666 char *dst;
668 BUG_ON(cm->length != 7);
670 if (unlikely(spaceleft < 7))
671 return 0;
673 dst = skb_put(skb, 7);
674 BUG_ON(dst == 0);
676 dst[0] = get_kp_code(KP_MISC, KP_MISC_ACK);
677 cor_put_u48(dst + 1, cm->msg.ack.seqno);
679 list_add_tail(&(cm->lh), &(cr->msgs));
681 return 7;
684 static inline __u8 cor_add_ack_conn_get_delayremaining(
685 struct cor_control_msg_out *cm, unsigned long cmsg_send_start_j)
687 __u32 maxdelay_ms = 0;
688 unsigned long jiffies_timeout;
689 if (cm->msg.ack_conn.is_highlatency) {
690 maxdelay_ms = CMSG_MAXDELAY_ACKCONN_HIGHLATENCY_MS;
691 } else {
692 maxdelay_ms = CMSG_MAXDELAY_ACKCONN_LOWLATENCY_MS;
695 jiffies_timeout = cm->time_added + msecs_to_jiffies(maxdelay_ms);
697 if (time_before_eq(cmsg_send_start_j, cm->time_added)) {
698 return 255;
699 } else if (time_after_eq(cmsg_send_start_j, jiffies_timeout)) {
700 return 0;
701 } else {
702 __u64 delay_remaining = jiffies_timeout - cmsg_send_start_j;
704 BUG_ON(delay_remaining > U32_MAX);
705 BUG_ON(delay_remaining > msecs_to_jiffies(maxdelay_ms));
707 return (__u8) div64_u64(255 * delay_remaining +
708 msecs_to_jiffies(maxdelay_ms)/2,
709 msecs_to_jiffies(maxdelay_ms));
713 static __u32 cor_add_ack_conn(struct sk_buff *skb,
714 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
715 __u32 spaceleft, unsigned long cmsg_send_start_j,
716 int *ackneeded)
718 char *dst;
719 __u32 offset = 0;
721 if (unlikely(spaceleft < cm->length))
722 return 0;
724 dst = skb_put(skb, cm->length);
725 BUG_ON(dst == 0);
727 dst[offset] = get_kp_code(KP_ACK_CONN, cm->msg.ack_conn.flags);
728 offset++;
729 cor_put_u32(dst + offset, cm->msg.ack_conn.conn_id);
730 offset += 4;
732 if (likely((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) != 0 ||
733 cor_ooolen(cm->msg.ack_conn.flags) != 0)) {
734 dst[offset] = cor_add_ack_conn_get_delayremaining(cm,
735 cmsg_send_start_j);
736 offset++;
739 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) != 0) {
740 cor_put_u48(dst + offset, cm->msg.ack_conn.seqno);
741 offset += 6;
743 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_WINDOW) != 0) {
744 BUG_ON(cm->msg.ack_conn.src_in == 0);
745 dst[offset] = cor_get_window(cm->msg.ack_conn.src_in,
746 cm->nb, cor_get_connid_reverse(
747 cm->msg.ack_conn.conn_id));
748 offset++;
749 dst[offset] = cm->msg.ack_conn.bufsize_changerate;
750 offset++;
754 if (cor_ooolen(cm->msg.ack_conn.flags) != 0) {
755 cor_put_u48(dst + offset, cm->msg.ack_conn.seqno_ooo);
756 offset += 6;
757 if (cor_ooolen(cm->msg.ack_conn.flags) == 1) {
758 BUG_ON(cm->msg.ack_conn.length > 255);
759 dst[offset] = cm->msg.ack_conn.length;
760 offset += 1;
761 } else if (cor_ooolen(cm->msg.ack_conn.flags) == 2) {
762 BUG_ON(cm->msg.ack_conn.length <= 255);
763 BUG_ON(cm->msg.ack_conn.length > 65535);
764 cor_put_u16(dst + offset, cm->msg.ack_conn.length);
765 offset += 2;
766 } else if (cor_ooolen(cm->msg.ack_conn.flags) == 4) {
767 BUG_ON(cm->msg.ack_conn.length <= 65535);
768 cor_put_u32(dst + offset, cm->msg.ack_conn.length);
769 offset += 4;
770 } else {
771 BUG();
775 if (unlikely((cm->msg.ack_conn.flags &
776 KP_ACK_CONN_FLAGS_PRIORITY) != 0)) {
777 __u16 priority = (cm->msg.ack_conn.priority_seqno << 12) &
778 cm->msg.ack_conn.priority;
779 BUG_ON(cm->msg.ack_conn.priority_seqno > 15);
780 BUG_ON(cm->msg.ack_conn.priority > 4095);
782 cor_put_u16(dst + offset, priority);
783 offset+=2;
786 list_add_tail(&(cm->lh), &(cr->msgs));
787 if (likely((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) != 0 ||
788 cor_ooolen(cm->msg.ack_conn.flags) != 0) &&
789 cm->msg.ack_conn.is_highlatency == 0) {
790 *ackneeded = ACK_NEEDED_FAST;
791 } else if (*ackneeded != ACK_NEEDED_FAST) {
792 *ackneeded = ACK_NEEDED_SLOW;
795 BUG_ON(offset != cm->length);
796 return offset;
799 static __u32 cor_add_ping(struct sk_buff *skb, __u32 cookie, __u32 spaceleft)
801 char *dst;
803 BUG_ON(KP_MISC_PING_CMDLEN != 5);
805 if (unlikely(spaceleft < 5))
806 return 0;
808 dst = skb_put(skb, 5);
809 BUG_ON(dst == 0);
811 dst[0] = get_kp_code(KP_MISC, KP_MISC_PING);
812 cor_put_u32(dst + 1, cookie);
814 return 5;
817 static __u32 cor_calc_respdelay(ktime_t time_pong_enqueued, ktime_t time_end)
819 if (unlikely(ktime_before(time_end, time_pong_enqueued))) {
820 return 0;
821 } else {
822 __s64 respdelay = div_u64(ktime_to_ns(time_end) -
823 ktime_to_ns(time_pong_enqueued) + 500,
824 1000);
826 if (unlikely(respdelay > U32_MAX))
827 return U32_MAX;
828 else if (unlikely(respdelay < 0))
829 return 0;
830 else
831 return (__u32) respdelay;
835 static __u32 cor_add_pong(struct sk_buff *skb, struct cor_control_retrans *cr,
836 struct cor_control_msg_out *cm, __u32 spaceleft, int nbstate,
837 ktime_t cmsg_send_start, int *ackneeded)
839 __u32 respdelay_full;
840 __u32 respdelay_netonly;
841 char *dst;
843 BUG_ON(cm->length != 13);
845 if (unlikely(spaceleft < 13))
846 return 0;
848 respdelay_full = cor_calc_respdelay(cm->msg.pong.time_enqueued,
849 cmsg_send_start);
850 respdelay_netonly = cor_calc_respdelay(cm->msg.pong.ping_rcvtime,
851 ktime_get());
853 dst = skb_put(skb, 13);
854 BUG_ON(dst == 0);
856 dst[0] = get_kp_code(KP_MISC, KP_MISC_PONG);
857 cor_put_u32(dst + 1, cm->msg.pong.cookie);
858 cor_put_u32(dst + 5, (__u32) respdelay_full);
859 cor_put_u32(dst + 9, (__u32) respdelay_netonly);
861 list_add_tail(&(cm->lh), &(cr->msgs));
862 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE) &&
863 *ackneeded != ACK_NEEDED_FAST)
864 *ackneeded = ACK_NEEDED_SLOW;
866 return 13;
869 static __u32 cor_add_connect(struct sk_buff *skb,
870 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
871 __u32 spaceleft, int *ackneeded)
873 char *dst;
874 struct cor_conn *src_in = cm->msg.connect.src_in;
875 struct cor_conn *trgt_out = cor_get_conn_reversedir(src_in);
876 __u16 priority;
878 BUG_ON(cm->length != 21);
880 if (unlikely(spaceleft < 21))
881 return 0;
883 dst = skb_put(skb, 21);
884 BUG_ON(dst == 0);
886 dst[0] = get_kp_code(KP_MISC, KP_MISC_CONNECT);
887 cor_put_u32(dst + 1, cm->msg.connect.conn_id);
888 cor_put_u48(dst + 5, cm->msg.connect.seqno1);
889 cor_put_u48(dst + 11, cm->msg.connect.seqno2);
890 BUG_ON(cm->msg.connect.src_in == 0);
891 dst[17] = cor_get_window(cm->msg.connect.src_in, cm->nb,
892 cor_get_connid_reverse(cm->msg.connect.conn_id));
894 spin_lock_bh(&(trgt_out->rcv_lock));
895 BUG_ON(trgt_out->targettype != TARGET_OUT);
897 priority = (trgt_out->target.out.priority_seqno << 12) &
898 trgt_out->target.out.priority_last;
899 BUG_ON(trgt_out->target.out.priority_seqno > 15);
900 BUG_ON(trgt_out->target.out.priority_last > 4095);
901 cor_put_u16(dst + 18, priority);
903 if (src_in->is_highlatency == 0)
904 dst[20] = 0;
905 else
906 dst[20] = 1;
908 spin_unlock_bh(&(trgt_out->rcv_lock));
910 list_add_tail(&(cm->lh), &(cr->msgs));
911 if (*ackneeded != ACK_NEEDED_FAST)
912 *ackneeded = ACK_NEEDED_SLOW;
914 return 21;
917 static __u32 cor_add_connect_success(struct sk_buff *skb,
918 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
919 __u32 spaceleft, int *ackneeded)
921 char *dst;
923 BUG_ON(cm->length != 6);
925 if (unlikely(spaceleft < 6))
926 return 0;
928 dst = skb_put(skb, 6);
929 BUG_ON(dst == 0);
931 dst[0] = get_kp_code(KP_MISC, KP_MISC_CONNECT_SUCCESS);
932 cor_put_u32(dst + 1, cm->msg.connect_success.conn_id);
933 BUG_ON(cm->msg.connect_success.src_in == 0);
934 dst[5] = cor_get_window(cm->msg.connect_success.src_in, cm->nb,
935 cor_get_connid_reverse(
936 cm->msg.connect_success.conn_id));
938 list_add_tail(&(cm->lh), &(cr->msgs));
939 if (*ackneeded != ACK_NEEDED_FAST)
940 *ackneeded = ACK_NEEDED_SLOW;
942 return 6;
945 static __u32 cor_add_reset_conn(struct sk_buff *skb,
946 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
947 __u32 spaceleft, int *ackneeded)
949 char *dst;
951 BUG_ON(cm->length != 5);
953 if (unlikely(spaceleft < 5))
954 return 0;
956 dst = skb_put(skb, 5);
957 BUG_ON(dst == 0);
959 dst[0] = get_kp_code(KP_MISC, KP_MISC_RESET_CONN);
960 cor_put_u32(dst + 1, cm->msg.reset_conn.conn_id);
962 list_add_tail(&(cm->lh), &(cr->msgs));
963 if (*ackneeded != ACK_NEEDED_FAST)
964 *ackneeded = ACK_NEEDED_SLOW;
966 return 5;
969 static __u32 cor_add_conndata(struct sk_buff *skb,
970 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
971 __u32 spaceleft, struct cor_control_msg_out **split_conndata,
972 __u32 *sc_sendlen)
974 char *dst;
975 __u32 offset = 0;
977 __u32 totallen = get_kp_conn_data_length(cm->msg.conn_data.datalen);
978 __u32 putlen = totallen;
979 __u32 dataputlen = cm->msg.conn_data.datalen;
980 __u8 code_min = 0;
982 BUILD_BUG_ON(KP_CONN_DATA_MAXLEN != 128+32767);
983 BUG_ON(cm->msg.conn_data.datalen > KP_CONN_DATA_MAXLEN);
985 BUG_ON(cm->length != totallen);
987 BUG_ON(putlen > 1024*1024*1024);
989 BUG_ON(split_conndata == 0);
990 BUG_ON(*split_conndata != 0);
991 BUG_ON(sc_sendlen == 0);
992 BUG_ON(*sc_sendlen != 0);
994 if (putlen > spaceleft) {
995 if (spaceleft < get_kp_conn_data_length(1))
996 return 0;
998 BUG_ON(spaceleft < 13);
1000 if (spaceleft <= 127 + 12) {
1001 dataputlen = spaceleft - 12;
1002 putlen = spaceleft;
1003 } else if (spaceleft == 127 - 12 + 1) {
1004 dataputlen = spaceleft - 12 - 1;
1005 putlen = spaceleft - 1;
1006 } else {
1007 dataputlen = spaceleft - 13;
1008 putlen = spaceleft;
1011 BUG_ON(putlen != get_kp_conn_data_length(dataputlen));
1014 dst = skb_put(skb, putlen);
1015 BUG_ON(dst == 0);
1017 BUG_ON((cm->msg.conn_data.windowused &
1018 (~KP_CONN_DATA_FLAGS_WINDOWUSED)) != 0);
1019 code_min = 0;
1020 if (cm->msg.conn_data.flush != 0)
1021 code_min |= KP_CONN_DATA_FLAGS_FLUSH;
1022 code_min |= cm->msg.conn_data.windowused;
1024 dst[0] = get_kp_code(KP_CONN_DATA, code_min);
1025 offset++;
1026 cor_put_u32(dst + offset, cm->msg.conn_data.conn_id);
1027 offset += 4;
1028 cor_put_u48(dst + offset, cm->msg.conn_data.seqno);
1029 offset += 6;
1031 if (dataputlen < 128) {
1032 dst[offset] = (__u8) dataputlen;
1033 offset++;
1034 } else {
1035 __u8 high = (__u8) (128 + ((dataputlen - 128) / 256));
1036 __u8 low = (__u8) ((dataputlen - 128) % 256);
1037 BUG_ON(((dataputlen - 128) / 256) > 127);
1038 dst[offset] = high;
1039 dst[offset+1] = low;
1040 offset += 2;
1043 BUG_ON(offset > putlen);
1044 BUG_ON(putlen - offset != dataputlen);
1045 memcpy(dst + offset, cm->msg.conn_data.data, dataputlen);
1046 offset += dataputlen;
1048 if (cm->msg.conn_data.datalen == dataputlen) {
1049 BUG_ON(cm->length != putlen);
1050 list_add_tail(&(cm->lh), &(cr->msgs));
1051 } else {
1052 *split_conndata = cm;
1053 *sc_sendlen = dataputlen;
1056 return putlen;
1059 static __u32 cor_add_set_max_cmsg_dly(struct sk_buff *skb,
1060 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
1061 __u32 spaceleft, int *ackneeded)
1063 char *dst;
1065 BUG_ON(KP_MISC_SET_MAX_CMSG_DELAY_CMDLEN != 21);
1066 BUG_ON(cm->length != KP_MISC_SET_MAX_CMSG_DELAY_CMDLEN);
1068 if (unlikely(spaceleft < 21))
1069 return 0;
1071 dst = skb_put(skb, 21);
1072 BUG_ON(dst == 0);
1074 dst[0] = get_kp_code(KP_MISC, KP_MISC_SET_MAX_CMSG_DELAY);
1075 cor_put_u32(dst + 1, cm->msg.set_max_cmsg_delay.ack_fast_delay);
1076 cor_put_u32(dst + 5, cm->msg.set_max_cmsg_delay.ack_slow_delay);
1077 cor_put_u32(dst + 9,
1078 cm->msg.set_max_cmsg_delay.ackconn_lowlatency_delay);
1079 cor_put_u32(dst + 13,
1080 cm->msg.set_max_cmsg_delay.ackconn_highlatency_delay);
1081 cor_put_u32(dst + 17, cm->msg.set_max_cmsg_delay.pong_delay);
1083 list_add_tail(&(cm->lh), &(cr->msgs));
1084 if (*ackneeded != ACK_NEEDED_FAST)
1085 *ackneeded = ACK_NEEDED_SLOW;
1087 return 21;
1090 static __u32 cor_add_message(struct sk_buff *skb,
1091 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
1092 __u32 spaceleft, int nbstate, unsigned long cmsg_send_start_j,
1093 ktime_t cmsg_send_start_kt,
1094 struct cor_control_msg_out **split_conndata, __u32 *sc_sendlen,
1095 int *ackneeded)
1097 BUG_ON(split_conndata != 0 && *split_conndata != 0);
1098 BUG_ON(sc_sendlen != 0 && *sc_sendlen != 0);
1100 switch (cm->type) {
1101 case MSGTYPE_ACK:
1102 return cor_add_ack(skb, cr, cm, spaceleft);
1103 case MSGTYPE_ACK_CONN:
1104 return cor_add_ack_conn(skb, cr, cm, spaceleft,
1105 cmsg_send_start_j, ackneeded);
1106 case MSGTYPE_PONG:
1107 return cor_add_pong(skb, cr, cm, spaceleft, nbstate,
1108 cmsg_send_start_kt, ackneeded);
1109 case MSGTYPE_CONNECT:
1110 return cor_add_connect(skb, cr, cm, spaceleft, ackneeded);
1111 case MSGTYPE_CONNECT_SUCCESS:
1112 return cor_add_connect_success(skb, cr, cm, spaceleft,
1113 ackneeded);
1114 case MSGTYPE_RESET_CONN:
1115 return cor_add_reset_conn(skb, cr, cm, spaceleft, ackneeded);
1116 case MSGTYPE_CONNDATA:
1117 return cor_add_conndata(skb, cr, cm, spaceleft, split_conndata,
1118 sc_sendlen);
1119 case MSGTYPE_SET_MAX_CMSG_DELAY:
1120 return cor_add_set_max_cmsg_dly(skb, cr, cm, spaceleft,
1121 ackneeded);
1122 default:
1123 BUG();
1125 BUG();
1126 return 0;
1129 static __u32 ___cor_send_messages(struct cor_neighbor *nb, struct sk_buff *skb,
1130 struct cor_control_retrans *cr, struct list_head *cmsgs,
1131 __u32 spaceleft, int nbstate, unsigned long cmsg_send_start_j,
1132 ktime_t cmsg_send_start_kt,
1133 struct cor_control_msg_out **split_conndata, __u32 *sc_sendlen,
1134 int *ackneeded)
1136 __u32 length = 0;
1137 while (!list_empty(cmsgs)) {
1138 __u32 rc;
1139 struct cor_control_msg_out *cm = container_of(cmsgs->next,
1140 struct cor_control_msg_out, lh);
1142 list_del(&(cm->lh));
1144 rc = cor_add_message(skb, cr, cm, spaceleft - length, nbstate,
1145 cmsg_send_start_j, cmsg_send_start_kt,
1146 split_conndata, sc_sendlen, ackneeded);
1147 if (rc == 0) {
1148 BUG();
1149 list_add(&(cm->lh), cmsgs);
1150 break;
1153 BUG_ON(rc != cm->length && cm->type != MSGTYPE_CONNDATA);
1155 length += rc;
1158 return length;
1161 static __u32 ___cor_send_messages_smcd(struct cor_neighbor *nb,
1162 struct sk_buff *skb, struct cor_control_retrans *cr,
1163 __u32 spaceleft, int nbstate, unsigned long cmsg_send_start_j,
1164 ktime_t cmsg_send_start_kt, int *ackneeded)
1166 struct cor_control_msg_out *cm;
1167 __u32 rc;
1169 cm = cor_alloc_control_msg(nb, ACM_PRIORITY_MED);
1171 if (unlikely(cm == 0))
1172 return 0;
1174 cm->type = MSGTYPE_SET_MAX_CMSG_DELAY;
1175 cm->msg.set_max_cmsg_delay.ack_fast_delay =
1176 CMSG_MAXDELAY_ACK_FAST_MS * 1000;
1177 cm->msg.set_max_cmsg_delay.ack_slow_delay =
1178 CMSG_MAXDELAY_ACK_SLOW_MS * 1000;
1179 cm->msg.set_max_cmsg_delay.ackconn_lowlatency_delay =
1180 CMSG_MAXDELAY_ACKCONN_LOWLATENCY_MS * 1000;
1181 cm->msg.set_max_cmsg_delay.ackconn_highlatency_delay =
1182 CMSG_MAXDELAY_ACKCONN_HIGHLATENCY_MS * 1000;
1183 cm->msg.set_max_cmsg_delay.pong_delay =
1184 CMSG_MAXDELAY_OTHER_MS * 1000;
1185 cm->length = KP_MISC_SET_MAX_CMSG_DELAY_CMDLEN;
1187 rc = cor_add_message(skb, cr, cm, spaceleft, nbstate, cmsg_send_start_j,
1188 cmsg_send_start_kt, 0, 0, ackneeded);
1190 nb->max_cmsg_delay_sent = 1;
1192 return rc;
1195 #define CMSGQUEUE_PONG 1
1196 #define CMSGQUEUE_ACK_FAST 2
1197 #define CMSGQUEUE_ACK_SLOW 3
1198 #define CMSGQUEUE_ACK_CONN_URGENT 4
1199 #define CMSGQUEUE_ACK_CONN_LOWLAT 5
1200 #define CMSGQUEUE_ACK_CONN_HIGHLAT 6
1201 #define CMSGQUEUE_CONNDATA_LOWLAT 7
1202 #define CMSGQUEUE_CONNDATA_HIGHLAT 8
1203 #define CMSGQUEUE_OTHER 9
1205 static void cor_requeue_message(struct cor_control_msg_out *cm)
1207 if (cm->type == MSGTYPE_ACK_CONN) {
1208 struct cor_conn *cn_l = cm->msg.ack_conn.src_in;
1210 spin_lock_bh(&(cn_l->rcv_lock));
1211 if (unlikely(cor_ackconn_prepare_requeue(cn_l, cm) == 0)) {
1212 cor_free_control_msg(cm);
1213 } else {
1214 spin_lock_bh(&(cm->nb->cmsg_lock));
1216 if (unlikely(cm->msg.ack_conn.queue ==
1217 CMSGQUEUE_ACK_CONN_URGENT)) {
1218 list_add(&(cm->lh), &(cm->nb->
1219 cmsg_queue_ackconn_urgent));
1220 } else if (cm->msg.ack_conn.queue ==
1221 CMSGQUEUE_ACK_CONN_LOWLAT) {
1222 list_add(&(cm->lh), &(cm->nb->
1223 cmsg_queue_ackconn_lowlat));
1224 } else if (cm->msg.ack_conn.queue ==
1225 CMSGQUEUE_ACK_CONN_HIGHLAT) {
1226 list_add(&(cm->lh), &(cm->nb->
1227 cmsg_queue_ackconn_highlat));
1228 } else {
1229 BUG();
1232 cm->nb->cmsg_otherlength += cm->length;
1234 list_add(&(cm->msg.ack_conn.conn_acks),
1235 &(cn_l->source.in.acks_pending));
1236 cor_try_merge_ackconns(cn_l, cm);
1238 spin_unlock_bh(&(cm->nb->cmsg_lock));
1240 spin_unlock_bh(&(cn_l->rcv_lock));
1241 return;
1244 cor_enqueue_control_msg(cm, ADDCMSG_SRC_READD);
1247 static void cor_requeue_messages(struct list_head *lh)
1249 while (list_empty(lh) == 0) {
1250 struct cor_control_msg_out *cm = container_of(lh->prev,
1251 struct cor_control_msg_out, lh);
1252 list_del(&(cm->lh));
1253 cor_requeue_message(cm);
1257 static int __cor_send_messages_send(struct cor_neighbor *nb,
1258 struct sk_buff *skb, char *packet_type, int ping,
1259 int initsession, struct cor_control_retrans *cr,
1260 struct list_head *cmsgs, __u32 spaceleft, int nbstate,
1261 unsigned long cmsg_send_start_j, ktime_t cmsg_send_start_kt,
1262 int *sent)
1264 int rc;
1265 int ackneeded = ACK_NEEDED_NO;
1266 __u32 length = 0;
1267 __u32 pinglen = 0;
1268 __u32 pingcookie = 0;
1269 unsigned long last_ping_time;
1270 struct cor_control_msg_out *split_conndata = 0;
1271 __u32 sc_sendlen = 0;
1273 if (ping != TIMETOSENDPING_NO) {
1274 __u32 rc;
1276 if (unlikely(initsession)) {
1277 rc = cor_add_init_session(skb, nb->sessionid,
1278 spaceleft - length);
1279 BUG_ON(rc <= 0);
1280 pinglen = rc;
1281 length += rc;
1284 pingcookie = cor_add_ping_req(nb, &last_ping_time);
1285 rc = cor_add_ping(skb, pingcookie, spaceleft - length);
1286 BUG_ON(rc <= 0);
1287 pinglen += rc;
1288 length += rc;
1291 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE) &&
1292 unlikely(nb->max_cmsg_delay_sent == 0))
1293 length += ___cor_send_messages_smcd(nb, skb, cr,
1294 spaceleft - length, nbstate, cmsg_send_start_j,
1295 cmsg_send_start_kt, &ackneeded);
1297 length += ___cor_send_messages(nb, skb, cr, cmsgs, spaceleft - length,
1298 nbstate, cmsg_send_start_j, cmsg_send_start_kt,
1299 &split_conndata, &sc_sendlen, &ackneeded);
1301 BUG_ON(length > spaceleft);
1303 if (likely(ping != TIMETOSENDPING_FORCE) &&
1304 pinglen != 0 && unlikely(length == pinglen)) {
1305 cor_unadd_ping_req(nb, pingcookie, last_ping_time, 0);
1306 goto drop;
1309 if (unlikely(length == 0)) {
1310 drop:
1311 kfree_skb(skb);
1313 BUG_ON(list_empty(&(cr->msgs)) == 0);
1314 kref_put(&(cr->ref), cor_free_control_retrans);
1316 nb->kpacket_seqno--;
1317 return QOS_RESUME_DONE;
1320 //padding(skb, spaceleft - length);
1321 BUG_ON(spaceleft - length != 0 &&
1322 (split_conndata == 0 || spaceleft - length != 1));
1324 if (ackneeded == ACK_NEEDED_NO) {
1325 *packet_type = PACKET_TYPE_CMSG_NOACK;
1326 } else if (ackneeded == ACK_NEEDED_SLOW) {
1327 *packet_type = PACKET_TYPE_CMSG_ACKSLOW;
1328 } else if (ackneeded == ACK_NEEDED_FAST) {
1329 *packet_type = PACKET_TYPE_CMSG_ACKFAST;
1330 } else {
1331 BUG();
1334 rc = cor_dev_queue_xmit(skb, nb->queue, QOS_CALLER_KPACKET);
1335 if (rc == NET_XMIT_SUCCESS)
1336 *sent = 1;
1338 if (rc == NET_XMIT_DROP) {
1339 if (ping != 0)
1340 cor_unadd_ping_req(nb, pingcookie, last_ping_time, 1);
1342 atomic_inc(&(nb->cmsg_bulk_readds));
1343 if (split_conndata != 0)
1344 cor_requeue_message(split_conndata);
1346 cor_requeue_messages(&(cr->msgs));
1348 kref_put(&(cr->ref), cor_free_control_retrans);
1350 atomic_dec(&(nb->cmsg_bulk_readds));
1352 spin_lock_bh(&(nb->cmsg_lock));
1353 cor_schedule_controlmsg_timer(nb);
1354 spin_unlock_bh(&(nb->cmsg_lock));
1355 } else {
1356 struct list_head *curr = cr->msgs.next;
1358 if (pingcookie != 0)
1359 cor_ping_sent(nb, pingcookie);
1361 while (curr != &(cr->msgs)) {
1362 struct cor_control_msg_out *cm = container_of(curr,
1363 struct cor_control_msg_out, lh);
1365 curr = curr->next;
1367 if (cm->type == MSGTYPE_ACK || unlikely(
1368 cm->type == MSGTYPE_PONG &&
1369 (nbstate != NEIGHBOR_STATE_ACTIVE))) {
1370 list_del(&(cm->lh));
1371 cor_free_control_msg(cm);
1372 } else if (unlikely(cm->type == MSGTYPE_PONG &&
1373 atomic_inc_return(
1374 &(nb->cmsg_pongs_retrans_cnt)) >
1375 MAX_PONG_CMSGS_RETRANS_PER_NEIGH)) {
1376 atomic_dec(&(nb->cmsg_pongs_retrans_cnt));
1377 list_del(&(cm->lh));
1378 cor_free_control_msg(cm);
1379 } else if (cm->type == MSGTYPE_CONNDATA) {
1380 cor_schedule_retransmit_conn(
1381 cm->msg.conn_data.cr, 0, 0);
1382 kref_put(&(cm->msg.conn_data.cr->ref),
1383 cor_free_connretrans);
1384 cm->msg.conn_data.cr = 0;
1385 kfree(cm->msg.conn_data.data_orig);
1386 list_del(&(cm->lh));
1387 cor_free_control_msg(cm);
1391 if (split_conndata != 0) {
1392 BUG_ON(sc_sendlen == 0);
1393 BUG_ON(sc_sendlen >=
1394 split_conndata->msg.conn_data.datalen);
1396 split_conndata->msg.conn_data.seqno += sc_sendlen;
1397 split_conndata->msg.conn_data.data += sc_sendlen;
1398 split_conndata->msg.conn_data.datalen -= sc_sendlen;
1399 split_conndata->length = get_kp_conn_data_length(
1400 split_conndata->msg.conn_data.datalen);
1401 cor_enqueue_control_msg(split_conndata,
1402 ADDCMSG_SRC_SPLITCONNDATA);
1406 if (list_empty(&(cr->msgs))) {
1407 kref_put(&(cr->ref), cor_free_control_retrans);
1408 } else {
1409 int fastack = (ackneeded == ACK_NEEDED_FAST);
1410 BUG_ON(ackneeded != ACK_NEEDED_FAST &&
1411 ackneeded != ACK_NEEDED_SLOW);
1412 cor_schedule_retransmit(cr, nb, fastack);
1416 return (rc == NET_XMIT_SUCCESS) ? QOS_RESUME_DONE : QOS_RESUME_CONG;
1419 static int _cor_send_messages_send(struct cor_neighbor *nb, int ping,
1420 int initsession, struct list_head *cmsgs, int nbstate,
1421 __u32 length, __u64 seqno, unsigned long cmsg_send_start_j,
1422 ktime_t cmsg_send_start_kt, int *sent)
1424 struct sk_buff *skb;
1425 struct cor_control_retrans *cr;
1426 char *dst;
1427 int rc;
1429 BUG_ON(length > cor_mss_cmsg(nb));
1430 skb = cor_create_packet(nb, length + 7, GFP_ATOMIC);
1431 if (unlikely(skb == 0)) {
1432 printk(KERN_ERR "cor_send_messages(): cannot allocate skb (out of memory?)");
1434 cor_requeue_messages(cmsgs);
1435 return QOS_RESUME_CONG;
1438 cr = kmem_cache_alloc(cor_controlretrans_slab, GFP_ATOMIC);
1439 if (unlikely(cr == 0)) {
1440 printk(KERN_ERR "cor_send_messages(): cannot allocate cor_control_retrans (out of memory?)");
1441 kfree_skb(skb);
1443 cor_requeue_messages(cmsgs);
1444 return QOS_RESUME_CONG;
1447 memset(cr, 0, sizeof(struct cor_control_retrans));
1448 kref_init(&(cr->ref));
1449 cr->nb = nb;
1450 cr->seqno = seqno;
1451 INIT_LIST_HEAD(&(cr->msgs));
1454 dst = skb_put(skb, 7);
1455 BUG_ON(dst == 0);
1457 dst[0] = PACKET_TYPE_NONE;
1458 cor_put_u48(dst + 1, seqno);
1460 rc = __cor_send_messages_send(nb, skb, &(dst[0]), ping, initsession, cr,
1461 cmsgs, length, nbstate, cmsg_send_start_j,
1462 cmsg_send_start_kt, sent);
1464 BUG_ON(!list_empty(cmsgs));
1466 return rc;
1469 static unsigned long cor_get_cmsg_timeout(struct cor_control_msg_out *cm,
1470 int queue)
1472 if (cm->type == MSGTYPE_ACK) {
1473 if (cm->msg.ack.fast != 0) {
1474 BUG_ON(queue != CMSGQUEUE_ACK_FAST);
1475 return cm->time_added + msecs_to_jiffies(
1476 CMSG_MAXDELAY_ACK_FAST_MS);
1477 } else {
1478 BUG_ON(queue != CMSGQUEUE_ACK_SLOW);
1479 return cm->time_added + msecs_to_jiffies(
1480 CMSG_MAXDELAY_ACK_SLOW_MS);
1482 } else if (cm->type == MSGTYPE_ACK_CONN) {
1483 __u32 maxdelay_ms = 0;
1484 if (unlikely(queue == CMSGQUEUE_ACK_CONN_URGENT)) {
1485 maxdelay_ms = CMSG_MAXDELAY_ACKCONN_URGENT_MS;
1486 } else if (queue == CMSGQUEUE_ACK_CONN_LOWLAT) {
1487 maxdelay_ms = CMSG_MAXDELAY_ACKCONN_LOWLATENCY_MS;
1488 } else if (queue == CMSGQUEUE_ACK_CONN_HIGHLAT) {
1489 maxdelay_ms = CMSG_MAXDELAY_ACKCONN_HIGHLATENCY_MS;
1490 } else {
1491 BUG();
1493 return cm->time_added + msecs_to_jiffies(maxdelay_ms);
1494 } else if (cm->type == MSGTYPE_CONNDATA) {
1495 if (cm->msg.conn_data.highlatency != 0) {
1496 BUG_ON(queue != CMSGQUEUE_CONNDATA_HIGHLAT);
1497 return cm->time_added +
1498 msecs_to_jiffies(
1499 CMSG_MAXDELAY_CONNDATA_MS);
1500 } else {
1501 BUG_ON(queue != CMSGQUEUE_CONNDATA_LOWLAT);
1502 return cm->time_added;
1504 } else {
1505 BUG_ON(cm->type == MSGTYPE_PONG && queue != CMSGQUEUE_PONG);
1506 BUG_ON(cm->type != MSGTYPE_PONG && queue != CMSGQUEUE_OTHER);
1508 return cm->time_added +
1509 msecs_to_jiffies(CMSG_MAXDELAY_OTHER_MS);
1513 static void _cor_peek_message(struct cor_neighbor *nb_cmsglocked, int queue,
1514 struct cor_control_msg_out **currcm, unsigned long *currtimeout,
1515 __u32 **currlen)
1517 struct cor_control_msg_out *cm;
1518 unsigned long cmtimeout;
1520 struct list_head *queuelh;
1521 if (queue == CMSGQUEUE_PONG) {
1522 queuelh = &(nb_cmsglocked->cmsg_queue_pong);
1523 } else if (queue == CMSGQUEUE_ACK_FAST) {
1524 queuelh = &(nb_cmsglocked->cmsg_queue_ack_fast);
1525 } else if (queue == CMSGQUEUE_ACK_SLOW) {
1526 queuelh = &(nb_cmsglocked->cmsg_queue_ack_slow);
1527 } else if (queue == CMSGQUEUE_ACK_CONN_URGENT) {
1528 queuelh = &(nb_cmsglocked->cmsg_queue_ackconn_urgent);
1529 } else if (queue == CMSGQUEUE_ACK_CONN_LOWLAT) {
1530 queuelh = &(nb_cmsglocked->cmsg_queue_ackconn_lowlat);
1531 } else if (queue == CMSGQUEUE_ACK_CONN_HIGHLAT) {
1532 queuelh = &(nb_cmsglocked->cmsg_queue_ackconn_highlat);
1533 } else if (queue == CMSGQUEUE_CONNDATA_LOWLAT) {
1534 queuelh = &(nb_cmsglocked->cmsg_queue_conndata_lowlat);
1535 } else if (queue == CMSGQUEUE_CONNDATA_HIGHLAT) {
1536 queuelh = &(nb_cmsglocked->cmsg_queue_conndata_highlat);
1537 } else if (queue == CMSGQUEUE_OTHER) {
1538 queuelh = &(nb_cmsglocked->cmsg_queue_other);
1539 } else {
1540 BUG();
1543 if (list_empty(queuelh))
1544 return;
1546 cm = container_of(queuelh->next, struct cor_control_msg_out, lh);
1547 cmtimeout = cor_get_cmsg_timeout(cm, queue);
1549 BUG_ON(cm->nb != nb_cmsglocked);
1551 if (*currcm == 0 || (time_before(cmtimeout, *currtimeout) &&
1552 time_before(jiffies, *currtimeout))) {
1553 *currcm = cm;
1554 *currtimeout = cmtimeout;
1556 if (queue == CMSGQUEUE_PONG) {
1557 *currlen = &(nb_cmsglocked->cmsg_pongslength);
1558 } else {
1559 *currlen = &(nb_cmsglocked->cmsg_otherlength);
1564 static void cor_peek_message(struct cor_neighbor *nb_cmsglocked, int nbstate,
1565 struct cor_control_msg_out **cm, unsigned long *cmtimeout,
1566 __u32 **len, int for_timeout)
1568 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_PONG, cm, cmtimeout, len);
1569 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE)) {
1570 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK_FAST, cm,
1571 cmtimeout, len);
1572 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK_SLOW, cm,
1573 cmtimeout, len);
1574 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK_CONN_URGENT, cm,
1575 cmtimeout, len);
1576 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK_CONN_LOWLAT, cm,
1577 cmtimeout, len);
1578 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK_CONN_HIGHLAT, cm,
1579 cmtimeout, len);
1580 if (!for_timeout || atomic_read(
1581 &(nb_cmsglocked->cmsg_delay_conndata)) == 0) {
1582 _cor_peek_message(nb_cmsglocked,
1583 CMSGQUEUE_CONNDATA_LOWLAT,
1584 cm, cmtimeout, len);
1585 _cor_peek_message(nb_cmsglocked,
1586 CMSGQUEUE_CONNDATA_HIGHLAT,
1587 cm, cmtimeout, len);
1589 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_OTHER, cm, cmtimeout,
1590 len);
1594 static unsigned long cor_get_cmsg_timer_timeout(
1595 struct cor_neighbor *nb_cmsglocked, int nbstate)
1597 unsigned long pingtimeout = cor_get_next_ping_time(nb_cmsglocked);
1599 struct cor_control_msg_out *cm = 0;
1600 unsigned long cmtimeout;
1601 __u32 *len;
1603 cor_peek_message(nb_cmsglocked, nbstate, &cm, &cmtimeout, &len, 1);
1605 if (cm != 0) {
1606 unsigned long jiffies_tmp = jiffies;
1608 if (time_before(cmtimeout, jiffies_tmp))
1609 return jiffies_tmp;
1610 if (time_before(cmtimeout, pingtimeout))
1611 return cmtimeout;
1614 return pingtimeout;
1617 static void _cor_dequeue_messages(struct cor_neighbor *nb_cmsglocked,
1618 int nbstate, __u32 targetmss, __u32 *length,
1619 struct list_head *cmsgs)
1621 while (1) {
1622 __u32 spaceleft = targetmss - *length;
1623 struct cor_control_msg_out *cm = 0;
1624 unsigned long cmtimeout;
1625 __u32 *len;
1627 cor_peek_message(nb_cmsglocked, nbstate, &cm, &cmtimeout, &len,
1630 if (unlikely(cm == 0))
1631 break;
1633 BUG_ON(len == 0);
1635 if (cm->length > spaceleft) {
1636 if (cm->type == MSGTYPE_CONNDATA) {
1637 BUG_ON(*length == 0 && spaceleft <
1638 get_kp_conn_data_length(1));
1640 if (spaceleft < get_kp_conn_data_length(1) ||
1641 *length > (targetmss/4)*3)
1642 break;
1643 } else {
1644 BUG_ON(*length == 0);
1645 break;
1649 list_del(&(cm->lh));
1650 *len -= cm->length;
1652 if (cm->type == MSGTYPE_ACK_CONN)
1653 list_del(&(cm->msg.ack_conn.conn_acks));
1654 if (unlikely(cm->type == MSGTYPE_PONG)) {
1655 BUG_ON(cm->nb->cmsg_pongscnt == 0);
1656 cm->nb->cmsg_pongscnt--;
1659 if (unlikely(cm->type == MSGTYPE_RESET_CONN)) {
1660 BUG_ON(cm->msg.reset_conn.in_pending_conn_resets == 0);
1661 rb_erase(&(cm->msg.reset_conn.rbn),
1662 &(cm->nb->pending_conn_resets_rb));
1663 cm->msg.reset_conn.in_pending_conn_resets = 0;
1664 kref_put(&(cm->ref), cor_kreffree_bug);
1667 BUG_ON(*length + cm->length < *length);
1668 if (cm->length > targetmss - *length) {
1669 BUG_ON(*length >= targetmss);
1670 BUG_ON(cm->type != MSGTYPE_CONNDATA);
1671 *length = targetmss;
1672 } else {
1673 *length += cm->length;
1676 list_add_tail(&(cm->lh), cmsgs);
1680 static __u32 cor_get_total_messages_length(struct cor_neighbor *nb, int ping,
1681 int initsession, int nbstate, int *extralength)
1683 __u32 length = nb->cmsg_pongslength;
1685 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE)) {
1686 length += nb->cmsg_otherlength;
1688 if (unlikely(nb->max_cmsg_delay_sent == 0)) {
1689 length += KP_MISC_SET_MAX_CMSG_DELAY_CMDLEN;
1690 *extralength += KP_MISC_SET_MAX_CMSG_DELAY_CMDLEN;
1693 if (ping == TIMETOSENDPING_FORCE ||
1694 (length > 0 && ping != TIMETOSENDPING_NO)) {
1695 length += KP_MISC_PING_CMDLEN;
1696 *extralength += KP_MISC_PING_CMDLEN;
1698 if (unlikely(initsession)) {
1699 length += KP_MISC_INIT_SESSION_CMDLEN;
1700 *extralength += KP_MISC_INIT_SESSION_CMDLEN;
1704 return length;
1707 static int cor_dequeue_messages(struct cor_neighbor *nb_cmsglocked, int ping,
1708 int initsession, int nbstate, __u32 targetmss,
1709 __u32 *length, struct list_head *cmsgs)
1711 __u32 extralength = 0;
1712 __u32 totallength;
1714 int cmsgqueue_nonpong_empty = (
1715 list_empty(&(nb_cmsglocked->cmsg_queue_ack_fast)) &&
1716 list_empty(&(nb_cmsglocked->cmsg_queue_ack_slow)) &&
1717 list_empty(&(nb_cmsglocked->cmsg_queue_ackconn_urgent)) &&
1718 list_empty(&(nb_cmsglocked->cmsg_queue_ackconn_lowlat)) &&
1719 list_empty(&(nb_cmsglocked->cmsg_queue_ackconn_highlat)) &&
1720 list_empty(&(nb_cmsglocked->cmsg_queue_conndata_lowlat)) &&
1721 list_empty(&(nb_cmsglocked->cmsg_queue_conndata_highlat)) &&
1722 list_empty(&(nb_cmsglocked->cmsg_queue_other)));
1724 BUG_ON(list_empty(&(nb_cmsglocked->cmsg_queue_pong)) &&
1725 nb_cmsglocked->cmsg_pongslength != 0);
1726 BUG_ON(!list_empty(&(nb_cmsglocked->cmsg_queue_pong)) &&
1727 nb_cmsglocked->cmsg_pongslength == 0);
1728 BUG_ON(cmsgqueue_nonpong_empty &&
1729 nb_cmsglocked->cmsg_otherlength != 0);
1730 BUG_ON(!cmsgqueue_nonpong_empty &&
1731 nb_cmsglocked->cmsg_otherlength == 0);
1733 totallength = cor_get_total_messages_length(nb_cmsglocked, ping,
1734 initsession, nbstate, &extralength);
1736 if (totallength == 0)
1737 return 1;
1739 if (totallength < targetmss && ping != TIMETOSENDPING_FORCE &&
1740 time_after(cor_get_cmsg_timer_timeout(nb_cmsglocked,
1741 nbstate), jiffies))
1742 return 1;
1744 *length = extralength;
1746 _cor_dequeue_messages(nb_cmsglocked, nbstate, targetmss, length, cmsgs);
1748 BUG_ON(*length == 0);
1749 BUG_ON(*length > targetmss);
1751 return 0;
1754 static struct cor_control_retrans *cor_get_next_timeouted_retrans(
1755 struct cor_neighbor *nb_retranslocked)
1757 if (list_empty(&(nb_retranslocked->retrans_fast_list)) == 0) {
1758 struct cor_control_retrans *cr = container_of(
1759 nb_retranslocked->retrans_fast_list.next,
1760 struct cor_control_retrans, timeout_list);
1761 BUG_ON(cr->nb != nb_retranslocked);
1763 if (time_before_eq(cr->timeout, jiffies)) {
1764 return cr;
1768 if (list_empty(&(nb_retranslocked->retrans_slow_list)) == 0) {
1769 struct cor_control_retrans *cr = container_of(
1770 nb_retranslocked->retrans_slow_list.next,
1771 struct cor_control_retrans, timeout_list);
1772 BUG_ON(cr->nb != nb_retranslocked);
1774 if (time_before_eq(cr->timeout, jiffies)) {
1775 return cr;
1779 return 0;
1782 static void cor_add_timeouted_retrans(struct cor_neighbor *nb)
1784 spin_lock_bh(&(nb->retrans_lock));
1786 while (1) {
1787 struct cor_control_retrans *cr =
1788 cor_get_next_timeouted_retrans(nb);
1790 if (cr == 0)
1791 break;
1793 list_del(&(cr->timeout_list));
1794 rb_erase(&(cr->rbn), &(nb->kp_retransmits_rb));
1796 cor_requeue_control_retrans(cr);
1798 kref_put(&(cr->ref), cor_kreffree_bug); /* list_del */
1799 kref_put(&(cr->ref), cor_free_control_retrans); /* rb */
1802 if (list_empty(&(nb->retrans_fast_list)) == 0 ||
1803 list_empty(&(nb->retrans_slow_list)) == 0) {
1804 if (mod_timer(&(nb->retrans_timer),
1805 cor_get_retransmit_timeout(nb)) == 0) {
1806 kref_get(&(nb->ref));
1810 spin_unlock_bh(&(nb->retrans_lock));
1813 static void _cor_delete_all_cmsgs(struct list_head *cmsgs)
1815 while (!list_empty(cmsgs)) {
1816 struct cor_control_msg_out *cm = container_of(cmsgs->next,
1817 struct cor_control_msg_out, lh);
1819 list_del(&(cm->lh));
1821 if (cm->type == MSGTYPE_CONNDATA) {
1822 cor_schedule_retransmit_conn(cm->msg.conn_data.cr, 0,
1824 kfree(cm->msg.conn_data.data_orig);
1827 cor_free_control_msg(cm);
1831 static void cor_delete_all_cmsgs(struct cor_neighbor *nb)
1833 while (1) {
1834 struct list_head cmsgs;
1835 __u32 length = 0;
1837 INIT_LIST_HEAD(&cmsgs);
1839 spin_lock_bh(&(nb->cmsg_lock));
1840 _cor_dequeue_messages(nb, NEIGHBOR_STATE_ACTIVE, 65536, &length,
1841 &cmsgs);
1842 spin_unlock_bh(&(nb->cmsg_lock));
1844 if (list_empty(&cmsgs))
1845 break;
1847 _cor_delete_all_cmsgs(&cmsgs);
1851 static int cor_reset_timeouted_conn_needed(struct cor_neighbor *nb,
1852 struct cor_conn *src_in_l)
1854 if (unlikely(src_in_l->sourcetype != SOURCE_IN ||
1855 src_in_l->source.in.nb != nb ||
1856 src_in_l->isreset != 0))
1857 return 0;
1858 else if (likely(time_after(src_in_l->source.in.jiffies_last_act +
1859 CONN_ACTIVITY_UPDATEINTERVAL_SEC * HZ +
1860 CONN_INACTIVITY_TIMEOUT_SEC * HZ, jiffies)))
1861 return 0;
1863 return 1;
1866 static int cor_reset_timeouted_conn(struct cor_neighbor *nb,
1867 struct cor_conn *src_in)
1869 struct cor_conn_bidir *cnb = cor_get_conn_bidir(src_in);
1870 struct cor_conn *trgt_out = cor_get_conn_reversedir(src_in);
1872 int resetted = 0;
1874 spin_lock_bh(&(cnb->cli.rcv_lock));
1875 spin_lock_bh(&(cnb->srv.rcv_lock));
1877 resetted = cor_reset_timeouted_conn_needed(nb, src_in);
1878 if (unlikely(resetted == 0))
1879 goto unlock;
1881 resetted = (cor_send_reset_conn(nb, cor_get_connid_reverse(
1882 src_in->source.in.conn_id), 1) == 0);
1883 if (unlikely(resetted == 0))
1884 goto unlock;
1887 BUG_ON(trgt_out->isreset != 0);
1888 trgt_out->isreset = 1;
1890 unlock:
1891 spin_unlock_bh(&(cnb->srv.rcv_lock));
1892 spin_unlock_bh(&(cnb->cli.rcv_lock));
1894 if (resetted)
1895 cor_reset_conn(src_in);
1897 return resetted;
1900 static void cor_reset_timeouted_conns(struct cor_neighbor *nb)
1902 int i;
1903 for (i=0;i<10000;i++) {
1904 unsigned long iflags;
1905 struct list_head *lh;
1906 struct cor_conn *src_in;
1908 int resetted = 1;
1910 spin_lock_irqsave(&(nb->conn_list_lock), iflags);
1912 if (list_empty(&(nb->rcv_conn_list))) {
1913 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
1914 break;
1917 lh = nb->rcv_conn_list.next;
1918 list_del(lh);
1919 list_add_tail(lh, &(nb->rcv_conn_list));
1921 src_in = container_of(lh, struct cor_conn, source.in.nb_list);
1922 cor_conn_kref_get(src_in, "stack");
1924 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
1927 spin_lock_bh(&(src_in->rcv_lock));
1928 BUG_ON(src_in->sourcetype != SOURCE_IN);
1929 BUG_ON(src_in->source.in.nb != nb);
1930 resetted = cor_reset_timeouted_conn_needed(nb, src_in);
1931 spin_unlock_bh(&(src_in->rcv_lock));
1932 if (likely(resetted == 0))
1933 goto put;
1935 resetted = cor_reset_timeouted_conn(nb, src_in);
1937 put:
1938 cor_conn_kref_put(src_in, "stack");
1940 if (likely(resetted == 0))
1941 break;
1946 * may not be called by more than one thread at the same time, because
1947 * 1) readding cor_control_msg_out may reorder them
1948 * 2) multiple pings may be sent
1950 int cor_send_messages(struct cor_neighbor *nb, unsigned long cmsg_send_start_j,
1951 ktime_t cmsg_send_start_kt, int *sent)
1953 int rc = QOS_RESUME_DONE;
1954 int ping;
1955 int initsession;
1956 __u32 targetmss = cor_mss_cmsg(nb);
1958 int nbstate = cor_get_neigh_state(nb);
1960 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE))
1961 cor_reset_timeouted_conns(nb);
1963 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
1964 spin_lock_bh(&(nb->retrans_lock));
1965 cor_empty_retrans_queue(nb);
1966 spin_unlock_bh(&(nb->retrans_lock));
1968 cor_delete_all_cmsgs(nb);
1969 return QOS_RESUME_DONE;
1972 ping = cor_time_to_send_ping(nb);
1974 spin_lock_bh(&(nb->cmsg_lock));
1976 if (nb->add_retrans_needed != 0) {
1977 nb->add_retrans_needed = 0;
1978 spin_unlock_bh(&(nb->cmsg_lock));
1979 cor_add_timeouted_retrans(nb);
1980 spin_lock_bh(&(nb->cmsg_lock));
1983 initsession = unlikely(atomic_read(&(nb->sessionid_snd_needed)) != 0);
1985 while (1) {
1986 struct list_head cmsgs;
1987 __u32 length = 0;
1988 __u64 seqno;
1990 INIT_LIST_HEAD(&cmsgs);
1992 if (cor_dequeue_messages(nb, ping, initsession, nbstate,
1993 targetmss, &length, &cmsgs) != 0) {
1994 cor_schedule_controlmsg_timer(nb);
1995 spin_unlock_bh(&(nb->cmsg_lock));
1996 return QOS_RESUME_DONE;
1999 nb->kpacket_seqno++;
2000 seqno = nb->kpacket_seqno;
2002 spin_unlock_bh(&(nb->cmsg_lock));
2004 rc = _cor_send_messages_send(nb, ping, initsession, &cmsgs,
2005 nbstate, length, seqno, cmsg_send_start_j,
2006 cmsg_send_start_kt, sent);
2008 if (rc != QOS_RESUME_DONE)
2009 return rc;
2011 ping = 0;
2012 initsession = 0;
2014 spin_lock_bh(&(nb->cmsg_lock));
2018 static unsigned long cor_calc_cmsg_send_start_j(unsigned long cmsg_timer_timeout)
2020 unsigned long jiffies_tmp = jiffies;
2021 if (unlikely(time_after(cmsg_timer_timeout, jiffies_tmp)))
2022 return jiffies_tmp;
2023 else
2024 return cmsg_timer_timeout;
2027 static ktime_t cor_calc_cmsg_send_start_kt(unsigned long cmsg_timer_timeout)
2029 ktime_t now = ktime_get();
2030 unsigned long jiffies_tmp = jiffies;
2032 unsigned long jiffies_delayed;
2033 if (unlikely(time_after(cmsg_timer_timeout, jiffies_tmp))) {
2034 jiffies_delayed = 0;
2035 } else {
2036 jiffies_delayed = jiffies_tmp - cmsg_timer_timeout;
2037 if (unlikely(jiffies_delayed > HZ/10)) {
2038 jiffies_delayed = HZ/10;
2042 return ns_to_ktime(ktime_to_ns(now) -
2043 1000LL * jiffies_to_usecs(jiffies_delayed));
2047 void cor_controlmsg_timerfunc(struct timer_list *cmsg_timer)
2049 struct cor_neighbor *nb = container_of(cmsg_timer,
2050 struct cor_neighbor, cmsg_timer);
2051 unsigned long cmsg_timer_timeout = (unsigned long)
2052 atomic64_read(&(nb->cmsg_timer_timeout));
2053 unsigned long cmsg_send_start_j = cor_calc_cmsg_send_start_j(
2054 cmsg_timer_timeout);
2055 ktime_t cmsg_send_start_kt = cor_calc_cmsg_send_start_kt(
2056 cmsg_timer_timeout);
2057 cor_qos_enqueue(nb->queue, &(nb->rb_kp), cmsg_send_start_j,
2058 cmsg_send_start_kt, QOS_CALLER_KPACKET);
2059 kref_put(&(nb->ref), cor_neighbor_free);
2062 static int cor_cmsg_full_packet(struct cor_neighbor *nb, int nbstate)
2064 __u32 extralength = 0;
2065 int ping = cor_time_to_send_ping(nb);
2066 int initsession = unlikely(atomic_read(&(nb->sessionid_snd_needed)) !=
2068 __u32 len = cor_get_total_messages_length(nb, ping, initsession,
2069 nbstate, &extralength);
2071 if (len == 0)
2072 return 0;
2073 if (len < cor_mss_cmsg(nb))
2074 return 0;
2076 return 1;
2079 void cor_schedule_controlmsg_timer(struct cor_neighbor *nb_cmsglocked)
2081 unsigned long timeout;
2082 int nbstate = cor_get_neigh_state(nb_cmsglocked);
2084 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED))
2085 goto now;
2087 if (atomic_read(&(nb_cmsglocked->cmsg_bulk_readds)) != 0)
2088 return;
2090 if (cor_cmsg_full_packet(nb_cmsglocked, nbstate))
2091 goto now;
2093 if (nb_cmsglocked->add_retrans_needed != 0)
2094 goto now;
2096 timeout = cor_get_cmsg_timer_timeout(nb_cmsglocked, nbstate);
2098 if (0) {
2099 now:
2100 cor_qos_enqueue(nb_cmsglocked->queue, &(nb_cmsglocked->rb_kp),
2101 jiffies, ktime_get(), QOS_CALLER_KPACKET);
2102 } else if (time_before_eq(timeout, jiffies)) {
2103 unsigned long cmsg_send_start_j = cor_calc_cmsg_send_start_j(
2104 timeout);
2105 ktime_t cmsg_send_start_kt = cor_calc_cmsg_send_start_kt(
2106 timeout);
2107 cor_qos_enqueue(nb_cmsglocked->queue, &(nb_cmsglocked->rb_kp),
2108 cmsg_send_start_j, cmsg_send_start_kt,
2109 QOS_CALLER_KPACKET);
2110 } else {
2111 atomic64_set(&(nb_cmsglocked->cmsg_timer_timeout), timeout);
2112 barrier();
2113 if (mod_timer(&(nb_cmsglocked->cmsg_timer), timeout) == 0) {
2114 kref_get(&(nb_cmsglocked->ref));
2119 static int cor_insert_pending_conn_resets(struct cor_control_msg_out *ins)
2121 struct cor_neighbor *nb = ins->nb;
2122 __u32 conn_id = ins->msg.reset_conn.conn_id;
2124 struct rb_root *root;
2125 struct rb_node **p;
2126 struct rb_node *parent = 0;
2128 BUG_ON(nb == 0);
2129 BUG_ON(ins->msg.reset_conn.in_pending_conn_resets != 0);
2131 root = &(nb->pending_conn_resets_rb);
2132 p = &(root->rb_node);
2134 while ((*p) != 0) {
2135 struct cor_control_msg_out *cm = container_of(*p,
2136 struct cor_control_msg_out,
2137 msg.reset_conn.rbn);
2138 __u32 cm_connid = cm->msg.reset_conn.conn_id;
2140 BUG_ON(cm->nb != ins->nb);
2141 BUG_ON(cm->type != MSGTYPE_RESET_CONN);
2143 parent = *p;
2144 if (conn_id == cm_connid) {
2145 return 1;
2146 } else if (conn_id < cm_connid) {
2147 p = &(*p)->rb_left;
2148 } else if (conn_id > cm_connid) {
2149 p = &(*p)->rb_right;
2150 } else {
2151 BUG();
2155 kref_get(&(ins->ref));
2156 rb_link_node(&(ins->msg.reset_conn.rbn), parent, p);
2157 rb_insert_color(&(ins->msg.reset_conn.rbn), root);
2158 ins->msg.reset_conn.in_pending_conn_resets = 1;
2160 return 0;
2163 static void cor_free_oldest_pong(struct cor_neighbor *nb)
2165 struct cor_control_msg_out *cm = container_of(nb->cmsg_queue_pong.next,
2166 struct cor_control_msg_out, lh);
2168 BUG_ON(list_empty(&(nb->cmsg_queue_pong)));
2169 BUG_ON(unlikely(cm->type != MSGTYPE_PONG));
2171 list_del(&(cm->lh));
2172 nb->cmsg_pongslength -= cm->length;
2173 BUG_ON(nb->cmsg_pongscnt == 0);
2174 cm->nb->cmsg_pongscnt--;
2175 cor_free_control_msg(cm);
2178 static struct list_head * _cor_enqueue_control_msg_getqueue(
2179 struct cor_control_msg_out *cm)
2181 if (cm->type == MSGTYPE_ACK) {
2182 if (cm->msg.ack.fast != 0) {
2183 return &(cm->nb->cmsg_queue_ack_fast);
2184 } else {
2185 return &(cm->nb->cmsg_queue_ack_slow);
2187 } else if (cm->type == MSGTYPE_ACK_CONN) {
2188 if (unlikely(cm->msg.ack_conn.queue == CMSGQUEUE_ACK_CONN_URGENT)) {
2189 return &(cm->nb->cmsg_queue_ackconn_urgent);
2190 } else if (cm->msg.ack_conn.queue == CMSGQUEUE_ACK_CONN_LOWLAT) {
2191 return &(cm->nb->cmsg_queue_ackconn_lowlat);
2192 } else if (cm->msg.ack_conn.queue == CMSGQUEUE_ACK_CONN_HIGHLAT) {
2193 return &(cm->nb->cmsg_queue_ackconn_highlat);
2194 } else {
2195 BUG();
2197 } else if (cm->type == MSGTYPE_CONNDATA) {
2198 if (cm->msg.conn_data.highlatency != 0) {
2199 return &(cm->nb->cmsg_queue_conndata_highlat);
2200 } else {
2201 return &(cm->nb->cmsg_queue_conndata_lowlat);
2203 } else {
2204 return &(cm->nb->cmsg_queue_other);
2208 static int _cor_enqueue_control_msg(struct cor_control_msg_out *cm, int src)
2210 if (unlikely(cm->type == MSGTYPE_PONG)) {
2211 BUG_ON(src == ADDCMSG_SRC_SPLITCONNDATA);
2213 if (cm->nb->cmsg_pongscnt >= MAX_PONG_CMSGS_PER_NEIGH) {
2214 if (src != ADDCMSG_SRC_NEW) {
2215 BUG_ON(cm->nb->cmsg_pongscnt == 0);
2216 cm->nb->cmsg_pongscnt--;
2217 cor_free_control_msg(cm);
2218 return 1;
2219 } else {
2220 cor_free_oldest_pong(cm->nb);
2224 cm->nb->cmsg_pongscnt++;
2225 cm->nb->cmsg_pongslength += cm->length;
2227 if (src != ADDCMSG_SRC_NEW) {
2228 list_add(&(cm->lh), &(cm->nb->cmsg_queue_pong));
2229 } else {
2230 list_add_tail(&(cm->lh), &(cm->nb->cmsg_queue_pong));
2233 return 0;
2234 } else if (unlikely(cm->type == MSGTYPE_RESET_CONN)) {
2235 if (cor_insert_pending_conn_resets(cm) != 0) {
2236 cm->type = 0;
2237 cor_free_control_msg(cm);
2238 return 1;
2242 cm->nb->cmsg_otherlength += cm->length;
2243 if (src == ADDCMSG_SRC_NEW) {
2244 list_add_tail(&(cm->lh), _cor_enqueue_control_msg_getqueue(cm));
2245 } else {
2246 BUG_ON(src == ADDCMSG_SRC_SPLITCONNDATA &&
2247 cm->type != MSGTYPE_CONNDATA);
2248 BUG_ON(src == ADDCMSG_SRC_READD &&
2249 cm->type == MSGTYPE_ACK_CONN);
2251 list_add(&(cm->lh), _cor_enqueue_control_msg_getqueue(cm));
2254 return 0;
2257 static void cor_enqueue_control_msg(struct cor_control_msg_out *cm, int src)
2259 BUG_ON(cm == 0);
2260 BUG_ON(cm->nb == 0);
2262 if (src == ADDCMSG_SRC_NEW)
2263 cm->time_added = jiffies;
2265 spin_lock_bh(&(cm->nb->cmsg_lock));
2267 if (_cor_enqueue_control_msg(cm, src) != 0)
2268 goto out;
2270 if (src != ADDCMSG_SRC_READD && src != ADDCMSG_SRC_RETRANS)
2271 cor_schedule_controlmsg_timer(cm->nb);
2273 out:
2274 spin_unlock_bh(&(cm->nb->cmsg_lock));
2278 void cor_send_pong(struct cor_neighbor *nb, __u32 cookie, ktime_t ping_rcvtime)
2280 struct cor_control_msg_out *cm = _cor_alloc_control_msg(nb);
2282 if (unlikely(cm == 0))
2283 return;
2285 cm->nb = nb;
2286 cm->type = MSGTYPE_PONG;
2287 cm->msg.pong.cookie = cookie;
2288 cm->msg.pong.type = MSGTYPE_PONG_TIMEENQUEUED;
2289 cm->msg.pong.ping_rcvtime = ping_rcvtime;
2290 cm->msg.pong.time_enqueued = ktime_get();
2291 cm->length = 13;
2292 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2295 void cor_send_ack(struct cor_neighbor *nb, __u64 seqno, __u8 fast)
2297 struct cor_control_msg_out *cm = cor_alloc_control_msg(nb,
2298 ACM_PRIORITY_HIGH);
2300 if (unlikely(cm == 0))
2301 return;
2303 cm->nb = nb;
2304 cm->type = MSGTYPE_ACK;
2305 cm->msg.ack.seqno = seqno;
2306 cm->msg.ack.fast = fast;
2307 cm->length = 7;
2308 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2311 static __u8 get_queue_for_ackconn(struct cor_conn *src_in_lx)
2313 if (src_in_lx->is_highlatency != 0) {
2314 if (unlikely(cor_ackconn_urgent(src_in_lx))) {
2315 return CMSGQUEUE_ACK_CONN_LOWLAT;
2316 } else {
2317 return CMSGQUEUE_ACK_CONN_HIGHLAT;
2319 } else {
2320 if (unlikely(cor_ackconn_urgent(src_in_lx))) {
2321 return CMSGQUEUE_ACK_CONN_URGENT;
2322 } else {
2323 return CMSGQUEUE_ACK_CONN_LOWLAT;
2328 static void cor_set_ooolen_flags(struct cor_control_msg_out *cm)
2330 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
2331 (~KP_ACK_CONN_FLAGS_OOO));
2332 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
2333 cor_ooolen_to_flags(cm->msg.ack_conn.length));
2336 /* cmsg_lock must be held */
2337 static void cor_remove_pending_ackconn(struct cor_control_msg_out *cm)
2339 cm->nb->cmsg_otherlength -= cm->length;
2340 list_del(&(cm->lh));
2342 list_del(&(cm->msg.ack_conn.conn_acks));
2343 cor_conn_kref_put(cm->msg.ack_conn.src_in,
2344 "cor_control_msg_out ack_conn");
2345 cm->msg.ack_conn.src_in = 0;
2347 cm->type = 0;
2348 cor_free_control_msg(cm);
2351 /* cmsg_lock must be held */
2352 static void cor_recalc_scheduled_ackconn_size(struct cor_control_msg_out *cm)
2354 cm->nb->cmsg_otherlength -= cm->length;
2355 cm->length = 5 + cor_ack_conn_len(cm->msg.ack_conn.flags);
2356 cm->nb->cmsg_otherlength += cm->length;
2359 /* cmsg_lock must be held */
2360 static int _cor_try_merge_ackconn(struct cor_conn *src_in_l,
2361 struct cor_control_msg_out *fromcm,
2362 struct cor_control_msg_out *tocm, int from_newack)
2364 if (cor_ooolen(fromcm->msg.ack_conn.flags) != 0 &&
2365 cor_ooolen(tocm->msg.ack_conn.flags) != 0) {
2366 __u64 tocmseqno = tocm->msg.ack_conn.seqno_ooo;
2367 __u64 tocmlength = tocm->msg.ack_conn.length;
2368 __u64 fromcmseqno = fromcm->msg.ack_conn.seqno_ooo;
2369 __u64 fromcmlength = fromcm->msg.ack_conn.length;
2371 if (cor_seqno_eq(tocmseqno, fromcmseqno)) {
2372 if (fromcmlength > tocmlength)
2373 tocm->msg.ack_conn.length = fromcmlength;
2374 } else if (cor_seqno_after(fromcmseqno, tocmseqno) &&
2375 cor_seqno_before_eq(fromcmseqno, tocmseqno +
2376 tocmlength)) {
2377 __u64 len = cor_seqno_clean(fromcmseqno + fromcmlength -
2378 tocmseqno);
2379 BUG_ON(len > U32_MAX);
2380 tocm->msg.ack_conn.length = (__u32) len;
2381 } else if (cor_seqno_before(fromcmseqno, tocmseqno) &&
2382 cor_seqno_after_eq(fromcmseqno, tocmseqno)) {
2383 __u64 len = cor_seqno_clean(tocmseqno + tocmlength -
2384 fromcmseqno);
2385 BUG_ON(len > U32_MAX);
2386 tocm->msg.ack_conn.seqno_ooo = fromcmseqno;
2387 tocm->msg.ack_conn.length = (__u32) len;
2388 } else {
2389 return 1;
2391 cor_set_ooolen_flags(tocm);
2394 if ((fromcm->msg.ack_conn.flags &
2395 KP_ACK_CONN_FLAGS_SEQNO) != 0) {
2396 if ((tocm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) == 0)
2397 goto setseqno;
2399 BUG_ON(cor_seqno_eq(fromcm->msg.ack_conn.ack_seqno,
2400 tocm->msg.ack_conn.ack_seqno));
2401 if (cor_seqno_after_eq(tocm->msg.ack_conn.ack_seqno,
2402 fromcm->msg.ack_conn.ack_seqno)) {
2403 BUG_ON(cor_seqno_after(fromcm->msg.ack_conn.seqno,
2404 tocm->msg.ack_conn.seqno));
2405 goto skipseqno;
2408 BUG_ON(cor_seqno_before(fromcm->msg.ack_conn.seqno,
2409 tocm->msg.ack_conn.seqno));
2411 setseqno:
2412 tocm->msg.ack_conn.flags = (tocm->msg.ack_conn.flags |
2413 KP_ACK_CONN_FLAGS_SEQNO);
2414 tocm->msg.ack_conn.seqno = fromcm->msg.ack_conn.seqno;
2415 tocm->msg.ack_conn.ack_seqno = fromcm->msg.ack_conn.ack_seqno;
2417 skipseqno:
2418 if ((fromcm->msg.ack_conn.flags &
2419 KP_ACK_CONN_FLAGS_WINDOW) != 0)
2420 tocm->msg.ack_conn.flags = (tocm->msg.ack_conn.flags |
2421 KP_ACK_CONN_FLAGS_WINDOW);
2425 if (cor_ooolen(fromcm->msg.ack_conn.flags) != 0) {
2426 tocm->msg.ack_conn.seqno_ooo = fromcm->msg.ack_conn.seqno_ooo;
2427 tocm->msg.ack_conn.length = fromcm->msg.ack_conn.length;
2428 cor_set_ooolen_flags(tocm);
2431 if ((fromcm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) != 0) {
2432 BUG_ON((tocm->msg.ack_conn.flags &
2433 KP_ACK_CONN_FLAGS_PRIORITY) != 0);
2434 tocm->msg.ack_conn.priority_seqno =
2435 fromcm->msg.ack_conn.priority_seqno;
2436 tocm->msg.ack_conn.priority = fromcm->msg.ack_conn.priority;
2439 cor_recalc_scheduled_ackconn_size(tocm);
2440 if (from_newack == 0)
2441 cor_remove_pending_ackconn(fromcm);
2443 return 0;
2446 /* cmsg_lock must be held */
2447 static void cor_try_merge_ackconns(struct cor_conn *src_in_l,
2448 struct cor_control_msg_out *cm)
2450 struct list_head *currlh = cm->msg.ack_conn.conn_acks.next;
2452 while (currlh != &(src_in_l->source.in.acks_pending)) {
2453 struct cor_control_msg_out *currcm = container_of(currlh,
2454 struct cor_control_msg_out,
2455 msg.ack_conn.conn_acks);
2456 currlh = currlh->next;
2457 cor_remove_connack_oooflag_ifold(src_in_l, currcm);
2458 _cor_try_merge_ackconn(src_in_l, currcm, cm, 0);
2462 static void cor_merge_or_enqueue_ackconn(struct cor_conn *src_in_l,
2463 struct cor_control_msg_out *cm, int src)
2465 struct list_head *currlh;
2467 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
2469 spin_lock_bh(&(cm->nb->cmsg_lock));
2471 currlh = src_in_l->source.in.acks_pending.next;
2472 while (currlh != &(src_in_l->source.in.acks_pending)) {
2473 struct cor_control_msg_out *currcm = container_of(currlh,
2474 struct cor_control_msg_out,
2475 msg.ack_conn.conn_acks);
2477 BUG_ON(currcm->nb != cm->nb);
2478 BUG_ON(currcm->type != MSGTYPE_ACK_CONN);
2479 BUG_ON(cm->msg.ack_conn.src_in != src_in_l);
2480 BUG_ON(currcm->msg.ack_conn.conn_id !=
2481 cm->msg.ack_conn.conn_id);
2483 if (_cor_try_merge_ackconn(src_in_l, cm, currcm, 1) == 0) {
2484 cor_try_merge_ackconns(src_in_l, currcm);
2485 cor_schedule_controlmsg_timer(currcm->nb);
2486 spin_unlock_bh(&(currcm->nb->cmsg_lock));
2488 * flags:
2489 * when calling cor_free_control_msg here conn may
2490 * already be locked and priority_send_allowed and
2491 * priority_send_allowed should not be reset
2493 cm->msg.ack_conn.flags = 0;
2494 cor_free_control_msg(cm);
2495 return;
2498 currlh = currlh->next;
2501 list_add_tail(&(cm->msg.ack_conn.conn_acks),
2502 &(src_in_l->source.in.acks_pending));
2504 spin_unlock_bh(&(cm->nb->cmsg_lock));
2506 cor_enqueue_control_msg(cm, src);
2509 static int cor_try_update_ackconn_seqno(struct cor_conn *src_in_l)
2511 int rc = 1;
2513 spin_lock_bh(&(src_in_l->source.in.nb->cmsg_lock));
2515 if (list_empty(&(src_in_l->source.in.acks_pending)) == 0) {
2516 struct cor_control_msg_out *cm = container_of(
2517 src_in_l->source.in.acks_pending.next,
2518 struct cor_control_msg_out,
2519 msg.ack_conn.conn_acks);
2520 BUG_ON(cm->nb != src_in_l->source.in.nb);
2521 BUG_ON(cm->type != MSGTYPE_ACK_CONN);
2522 BUG_ON(cm->msg.ack_conn.src_in != src_in_l);
2523 BUG_ON(cm->msg.ack_conn.conn_id != cor_get_connid_reverse(
2524 src_in_l->source.in.conn_id));
2526 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
2527 KP_ACK_CONN_FLAGS_SEQNO |
2528 KP_ACK_CONN_FLAGS_WINDOW);
2529 cm->msg.ack_conn.seqno = src_in_l->source.in.next_seqno;
2531 src_in_l->source.in.ack_seqno++;
2532 cm->msg.ack_conn.ack_seqno = src_in_l->source.in.ack_seqno;
2534 cor_remove_connack_oooflag_ifold(src_in_l, cm);
2535 cor_recalc_scheduled_ackconn_size(cm);
2537 cor_try_merge_ackconns(src_in_l, cm);
2539 rc = 0;
2542 spin_unlock_bh(&(src_in_l->source.in.nb->cmsg_lock));
2544 return rc;
2547 void cor_send_ack_conn_ifneeded(struct cor_conn *src_in_l, __u64 seqno_ooo,
2548 __u32 ooo_length)
2550 struct cor_control_msg_out *cm;
2552 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
2554 BUG_ON(ooo_length > 0 && cor_seqno_before_eq(seqno_ooo,
2555 src_in_l->source.in.next_seqno));
2557 cor_update_windowlimit(src_in_l);
2559 if (ooo_length != 0) {
2560 cm = cor_alloc_control_msg(src_in_l->source.in.nb,
2561 ACM_PRIORITY_LOW);
2562 if (cm != 0)
2563 goto add;
2566 if (src_in_l->source.in.inorder_ack_needed != 0)
2567 goto ack_needed;
2569 if (cor_seqno_clean(src_in_l->source.in.window_seqnolimit -
2570 src_in_l->source.in.next_seqno) < WINDOW_ENCODE_MIN)
2571 return;
2573 if (cor_seqno_clean(src_in_l->source.in.window_seqnolimit_remote -
2574 src_in_l->source.in.next_seqno) >= WINDOW_ENCODE_MIN &&
2575 cor_seqno_clean(src_in_l->source.in.window_seqnolimit -
2576 src_in_l->source.in.next_seqno) * 7 <
2577 cor_seqno_clean(
2578 src_in_l->source.in.window_seqnolimit_remote -
2579 src_in_l->source.in.next_seqno) * 8)
2580 return;
2582 ack_needed:
2583 if (cor_try_update_ackconn_seqno(src_in_l) == 0)
2584 goto out;
2586 cm = cor_alloc_control_msg(src_in_l->source.in.nb, ACM_PRIORITY_MED);
2587 if (cm == 0) {
2588 printk(KERN_ERR "error allocating inorder ack");
2589 return;
2592 add:
2593 cm->type = MSGTYPE_ACK_CONN;
2594 src_in_l->source.in.ack_seqno++;
2595 cm->msg.ack_conn.ack_seqno = src_in_l->source.in.ack_seqno;
2596 cor_conn_kref_get(src_in_l, "cor_control_msg_out ack_conn");
2597 cm->msg.ack_conn.src_in = src_in_l;
2598 cm->msg.ack_conn.conn_id =
2599 cor_get_connid_reverse(src_in_l->source.in.conn_id);
2600 cm->msg.ack_conn.seqno = src_in_l->source.in.next_seqno;
2601 cm->msg.ack_conn.seqno_ooo = seqno_ooo;
2602 cm->msg.ack_conn.length = ooo_length;
2603 cm->msg.ack_conn.bufsize_changerate =
2604 _cor_bufsize_update_get_changerate(src_in_l);
2605 cm->msg.ack_conn.flags = KP_ACK_CONN_FLAGS_SEQNO |
2606 KP_ACK_CONN_FLAGS_WINDOW;
2607 cor_set_ooolen_flags(cm);
2608 cm->msg.ack_conn.is_highlatency = src_in_l->is_highlatency;
2609 cm->msg.ack_conn.queue = get_queue_for_ackconn(src_in_l);
2610 cm->length = 5 + cor_ack_conn_len(cm->msg.ack_conn.flags);
2612 cor_merge_or_enqueue_ackconn(src_in_l, cm, ADDCMSG_SRC_NEW);
2614 out:
2615 src_in_l->source.in.inorder_ack_needed = 0;
2616 src_in_l->source.in.window_seqnolimit_remote =
2617 src_in_l->source.in.window_seqnolimit;
2620 static int cor_try_add_priority(struct cor_conn *trgt_out_ll, __u16 priority)
2622 int rc = 1;
2623 struct cor_conn *src_in_ll = cor_get_conn_reversedir(trgt_out_ll);
2625 spin_lock_bh(&(trgt_out_ll->target.out.nb->cmsg_lock));
2627 if (list_empty(&(src_in_ll->source.in.acks_pending)) == 0) {
2628 struct cor_control_msg_out *cm = container_of(
2629 src_in_ll->source.in.acks_pending.next,
2630 struct cor_control_msg_out,
2631 msg.ack_conn.conn_acks);
2632 BUG_ON(cm->nb != trgt_out_ll->target.out.nb);
2633 BUG_ON(cm->type != MSGTYPE_ACK_CONN);
2634 BUG_ON(cm->msg.ack_conn.src_in != src_in_ll);
2635 BUG_ON(cm->msg.ack_conn.conn_id !=
2636 trgt_out_ll->target.out.conn_id);
2638 BUG_ON((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) !=
2640 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
2641 KP_ACK_CONN_FLAGS_PRIORITY);
2642 cm->msg.ack_conn.priority_seqno =
2643 trgt_out_ll->target.out.priority_seqno;
2644 cm->msg.ack_conn.priority = priority;
2645 cor_recalc_scheduled_ackconn_size(cm);
2647 rc = 0;
2650 spin_unlock_bh(&(trgt_out_ll->target.out.nb->cmsg_lock));
2652 return rc;
2655 void cor_send_priority(struct cor_conn *trgt_out_ll, __u16 priority)
2657 struct cor_conn *src_in_ll = cor_get_conn_reversedir(trgt_out_ll);
2658 struct cor_control_msg_out *cm;
2660 if (cor_try_add_priority(trgt_out_ll, priority) == 0)
2661 goto out;
2663 cm = cor_alloc_control_msg(trgt_out_ll->target.out.nb,
2664 ACM_PRIORITY_LOW);
2665 if (cm == 0)
2666 return;
2668 cm->type = MSGTYPE_ACK_CONN;
2669 cm->msg.ack_conn.flags = KP_ACK_CONN_FLAGS_PRIORITY;
2670 cor_conn_kref_get(src_in_ll, "cor_control_msg_out ack_conn");
2671 BUG_ON(trgt_out_ll->targettype != TARGET_OUT);
2672 cm->msg.ack_conn.src_in = src_in_ll;
2673 cm->msg.ack_conn.conn_id = trgt_out_ll->target.out.conn_id;
2674 cm->msg.ack_conn.bufsize_changerate =
2675 _cor_bufsize_update_get_changerate(src_in_ll);
2676 cm->msg.ack_conn.priority_seqno =
2677 trgt_out_ll->target.out.priority_seqno;
2678 cm->msg.ack_conn.priority = priority;
2679 cm->msg.ack_conn.is_highlatency = trgt_out_ll->is_highlatency;
2680 cm->msg.ack_conn.queue = get_queue_for_ackconn(src_in_ll);
2682 cm->length = 5 + cor_ack_conn_len(cm->msg.ack_conn.flags);
2683 cor_merge_or_enqueue_ackconn(src_in_ll, cm, ADDCMSG_SRC_NEW);
2685 out:
2686 trgt_out_ll->target.out.priority_last = priority;
2687 trgt_out_ll->target.out.priority_seqno =
2688 (trgt_out_ll->target.out.priority_seqno + 1) & 15;
2689 trgt_out_ll->target.out.priority_send_allowed = 0;
2692 void cor_free_ack_conns(struct cor_conn *src_in_lx)
2694 int changed = 0;
2695 spin_lock_bh(&(src_in_lx->source.in.nb->cmsg_lock));
2696 while (list_empty(&(src_in_lx->source.in.acks_pending)) == 0) {
2697 struct list_head *currlh =
2698 src_in_lx->source.in.acks_pending.next;
2699 struct cor_control_msg_out *currcm = container_of(currlh,
2700 struct cor_control_msg_out,
2701 msg.ack_conn.conn_acks);
2703 cor_remove_pending_ackconn(currcm);
2704 changed = 1;
2706 if (changed)
2707 cor_schedule_controlmsg_timer(src_in_lx->source.in.nb);
2708 spin_unlock_bh(&(src_in_lx->source.in.nb->cmsg_lock));
2711 void cor_send_connect_success(struct cor_control_msg_out *cm, __u32 conn_id,
2712 struct cor_conn *src_in)
2714 cm->type = MSGTYPE_CONNECT_SUCCESS;
2715 cm->msg.connect_success.conn_id = conn_id;
2716 cor_conn_kref_get(src_in, "cor_control_msg_out connect_success");
2717 cm->msg.connect_success.src_in = src_in;
2718 cm->length = 6;
2719 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2722 void cor_send_connect_nb(struct cor_control_msg_out *cm, __u32 conn_id,
2723 __u64 seqno1, __u64 seqno2, struct cor_conn *src_in_ll)
2725 cm->type = MSGTYPE_CONNECT;
2726 cm->msg.connect.conn_id = conn_id;
2727 cm->msg.connect.seqno1 = seqno1;
2728 cm->msg.connect.seqno2 = seqno2;
2729 cor_conn_kref_get(src_in_ll, "cor_control_msg_out connect");
2730 BUG_ON(src_in_ll->sourcetype != SOURCE_IN);
2731 cm->msg.connect.src_in = src_in_ll;
2732 cm->length = 21;
2733 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2736 void cor_send_conndata(struct cor_control_msg_out *cm, __u32 conn_id,
2737 __u64 seqno, char *data_orig, char *data, __u32 datalen,
2738 __u8 windowused, __u8 flush, __u8 highlatency,
2739 struct cor_conn_retrans *cr)
2741 cm->type = MSGTYPE_CONNDATA;
2742 cm->msg.conn_data.conn_id = conn_id;
2743 cm->msg.conn_data.seqno = seqno;
2744 cm->msg.conn_data.data_orig = data_orig;
2745 cm->msg.conn_data.data = data;
2746 cm->msg.conn_data.datalen = datalen;
2747 cm->msg.conn_data.windowused = windowused;
2748 cm->msg.conn_data.flush = flush;
2749 cm->msg.conn_data.highlatency = highlatency;
2750 cm->msg.conn_data.cr = cr;
2751 kref_get(&(cr->ref));
2752 cm->length = get_kp_conn_data_length(datalen);
2753 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2756 int cor_send_reset_conn(struct cor_neighbor *nb, __u32 conn_id, int lowprio)
2758 struct cor_control_msg_out *cm;
2760 if (unlikely(cor_get_neigh_state(nb) == NEIGHBOR_STATE_KILLED))
2761 return 0;
2763 cm = cor_alloc_control_msg(nb, lowprio ?
2764 ACM_PRIORITY_LOW : ACM_PRIORITY_MED);
2766 if (unlikely(cm == 0))
2767 return 1;
2769 cm->type = MSGTYPE_RESET_CONN;
2770 cm->msg.reset_conn.conn_id = conn_id;
2771 cm->length = 5;
2773 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2775 return 0;
2778 int __init cor_kgen_init(void)
2780 cor_controlmsg_slab = kmem_cache_create("cor_controlmsg",
2781 sizeof(struct cor_control_msg_out), 8, 0, 0);
2782 if (unlikely(cor_controlmsg_slab == 0))
2783 return -ENOMEM;
2785 cor_controlretrans_slab = kmem_cache_create("cor_controlretransmsg",
2786 sizeof(struct cor_control_retrans), 8, 0, 0);
2787 if (unlikely(cor_controlretrans_slab == 0))
2788 return -ENOMEM;
2790 return 0;
2793 void __exit cor_kgen_exit2(void)
2795 kmem_cache_destroy(cor_controlretrans_slab);
2796 cor_controlretrans_slab = 0;
2798 kmem_cache_destroy(cor_controlmsg_slab);
2799 cor_controlmsg_slab = 0;
2802 MODULE_LICENSE("GPL");