move some stuff from common.c to neighbor.c
[cor.git] / net / cor / kpacket_gen.c
blob1e046c4e0a9a4b71203d103a457da53ce1892db9
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <asm/byteorder.h>
23 #include "cor.h"
25 /* not sent over the network - internal meaning only */
26 #define MSGTYPE_PONG 1
27 #define MSGTYPE_ACK 2
28 #define MSGTYPE_ACK_CONN 3
29 #define MSGTYPE_CONNECT 4
30 #define MSGTYPE_CONNECT_SUCCESS 5
31 #define MSGTYPE_RESET_CONN 6
32 #define MSGTYPE_CONNDATA 7
33 #define MSGTYPE_SET_MAX_CMSG_DELAY 8
35 #define MSGTYPE_PONG_TIMEENQUEUED 1
36 #define MSGTYPE_PONG_RESPDELAY 2
38 struct cor_control_msg_out{
39 __u8 type;
40 __u32 length;
41 struct kref ref;
42 struct cor_neighbor *nb;
44 /* either queue or control_retrans_packet */
45 struct list_head lh;
47 unsigned long time_added;
49 union{
50 struct{
51 __u32 cookie;
52 __u8 type;
54 ktime_t ping_rcvtime;
55 ktime_t time_enqueued;
56 }pong;
58 struct{
59 __u64 seqno;
60 }ack;
62 struct{
63 struct cor_conn *src_in;
64 struct list_head conn_acks;
65 __u32 conn_id;
66 __u64 seqno;
67 __u64 seqno_ooo;
68 __u32 length;
70 __u8 priority_seqno;
71 __u8 priority;
73 __u8 flags;
75 __u32 ack_seqno;
76 }ack_conn;
78 struct{
79 __u32 conn_id;
80 __u64 seqno1;
81 __u64 seqno2;
82 struct cor_conn *src_in;
83 }connect;
85 struct{
86 __u32 conn_id;
87 struct cor_conn *src_in;
88 }connect_success;
90 struct{
91 struct rb_node rbn;
92 __u8 in_pending_conn_resets;
93 __u32 conn_id;
94 }reset_conn;
96 struct{
97 __u32 conn_id;
98 __u64 seqno;
99 __u32 datalen;
100 __u8 snd_delayed_lowbuf;
101 __u8 flush;
102 __u8 highlatency;
103 char *data_orig;
104 char *data;
105 struct cor_conn_retrans *cr;
106 }conn_data;
108 struct{
109 __u32 ack_delay;
110 __u32 ackconn_delay;
111 __u32 other_delay;
112 }set_max_cmsg_delay;
113 }msg;
116 struct cor_control_retrans {
117 struct kref ref;
119 struct cor_neighbor *nb;
120 __u64 seqno;
122 unsigned long timeout;
124 struct list_head msgs;
126 struct rb_node rbn;
127 struct list_head timeout_list;
131 static struct kmem_cache *cor_controlmsg_slab;
132 static struct kmem_cache *cor_controlretrans_slab;
134 static atomic_t cor_cmsg_othercnt = ATOMIC_INIT(0);
136 #define ADDCMSG_SRC_NEW 1
137 #define ADDCMSG_SRC_SPLITCONNDATA 2
138 #define ADDCMSG_SRC_READD 3
139 #define ADDCMSG_SRC_RETRANS 4
141 static void cor_enqueue_control_msg(struct cor_control_msg_out *msg, int src);
143 static void cor_try_merge_ackconns(struct cor_conn *src_in_l,
144 struct cor_control_msg_out *cm);
146 static void cor_merge_or_enqueue_ackconn(struct cor_conn *src_in_l,
147 struct cor_control_msg_out *cm, int src);
149 static struct cor_control_msg_out *_cor_alloc_control_msg(
150 struct cor_neighbor *nb)
152 struct cor_control_msg_out *cm;
154 BUG_ON(nb == 0);
156 cm = kmem_cache_alloc(cor_controlmsg_slab, GFP_ATOMIC);
157 if (unlikely(cm == 0))
158 return 0;
159 memset(cm, 0, sizeof(struct cor_control_msg_out));
160 kref_init(&(cm->ref));
161 cm->nb = nb;
162 return cm;
165 static int cor_calc_limit(int limit, int priority)
167 if (priority == ACM_PRIORITY_LOW)
168 return (limit+1)/2;
169 else if (priority == ACM_PRIORITY_MED)
170 return (limit * 3 + 1)/4;
171 else if (priority == ACM_PRIORITY_HIGH)
172 return limit;
173 else
174 BUG();
177 struct cor_control_msg_out *cor_alloc_control_msg(struct cor_neighbor *nb,
178 int priority)
180 struct cor_control_msg_out *cm = 0;
182 long packets1;
183 long packets2;
185 BUG_ON(nb == 0);
187 packets1 = atomic_inc_return(&(nb->cmsg_othercnt));
188 packets2 = atomic_inc_return(&(cor_cmsg_othercnt));
190 BUG_ON(packets1 <= 0);
191 BUG_ON(packets2 <= 0);
193 if (packets1 <= cor_calc_limit(GUARANTEED_CMSGS_PER_NEIGH, priority))
194 goto alloc;
196 if (unlikely(unlikely(packets1 > cor_calc_limit(MAX_CMSGS_PER_NEIGH,
197 priority)) ||
198 unlikely(packets2 > cor_calc_limit(MAX_CMSGS,
199 priority))))
200 goto full;
202 alloc:
203 cm = _cor_alloc_control_msg(nb);
204 if (unlikely(cm == 0)) {
205 full:
207 /* printk(KERN_ERR "cor_alloc_control_msg failed %ld %ld", packets1, packets2); */
208 atomic_dec(&(nb->cmsg_othercnt));
209 atomic_dec(&(cor_cmsg_othercnt));
211 return cm;
214 static void cor_cmsg_kref_free(struct kref *ref)
216 struct cor_control_msg_out *cm = container_of(ref,
217 struct cor_control_msg_out, ref);
218 kmem_cache_free(cor_controlmsg_slab, cm);
221 void cor_free_control_msg(struct cor_control_msg_out *cm)
223 if (likely(cm->type != MSGTYPE_PONG)) {
224 atomic_dec(&(cm->nb->cmsg_othercnt));
225 atomic_dec(&(cor_cmsg_othercnt));
228 if (cm->type == MSGTYPE_ACK_CONN) {
229 struct cor_conn *trgt_out = cm->msg.ack_conn.src_in->reversedir;
230 BUG_ON(cm->msg.ack_conn.src_in == 0);
231 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) != 0){
232 spin_lock_bh(&(trgt_out->rcv_lock));
233 BUG_ON(trgt_out->targettype != TARGET_OUT);
234 if (trgt_out->target.out.priority_send_allowed != 0) {
235 trgt_out->target.out.priority_send_allowed = 1;
236 spin_unlock_bh(&(trgt_out->rcv_lock));
237 cor_refresh_conn_priority(trgt_out, 0);
238 } else {
239 spin_unlock_bh(&(trgt_out->rcv_lock));
242 kref_put(&(cm->msg.ack_conn.src_in->ref), cor_free_conn);
243 cm->msg.ack_conn.src_in = 0;
244 } else if (cm->type == MSGTYPE_CONNECT) {
245 BUG_ON(cm->msg.connect.src_in == 0);
246 kref_put(&(cm->msg.connect.src_in->ref), cor_free_conn);
247 cm->msg.connect.src_in = 0;
248 } else if (cm->type == MSGTYPE_CONNECT_SUCCESS) {
249 BUG_ON(cm->msg.connect_success.src_in == 0);
250 kref_put(&(cm->msg.connect_success.src_in->ref), cor_free_conn);
251 cm->msg.connect_success.src_in = 0;
252 } else if (cm->type == MSGTYPE_RESET_CONN) {
253 spin_lock_bh(&(cm->nb->cmsg_lock));
254 if (cm->msg.reset_conn.in_pending_conn_resets != 0) {
255 rb_erase(&(cm->msg.reset_conn.rbn),
256 &(cm->nb->pending_conn_resets_rb));
257 cm->msg.reset_conn.in_pending_conn_resets = 0;
259 kref_put(&(cm->ref), cor_kreffree_bug);
261 spin_unlock_bh(&(cm->nb->cmsg_lock));
264 kref_put(&(cm->ref), cor_cmsg_kref_free);
267 static void cor_free_control_retrans(struct kref *ref)
269 struct cor_control_retrans *cr = container_of(ref,
270 struct cor_control_retrans, ref);
272 while (list_empty(&(cr->msgs)) == 0) {
273 struct cor_control_msg_out *cm = container_of(cr->msgs.next,
274 struct cor_control_msg_out, lh);
276 if (cm->type == MSGTYPE_PONG)
277 atomic_dec(&(cm->nb->cmsg_pongs_retrans_cnt));
279 list_del(&(cm->lh));
280 cor_free_control_msg(cm);
283 kmem_cache_free(cor_controlretrans_slab, cr);
286 struct cor_control_retrans *cor_get_control_retrans(
287 struct cor_neighbor *nb_retranslocked, __u64 seqno)
289 struct rb_node *n = 0;
290 struct cor_control_retrans *ret = 0;
292 n = nb_retranslocked->kp_retransmits_rb.rb_node;
294 while (likely(n != 0) && ret == 0) {
295 struct cor_control_retrans *cr = container_of(n,
296 struct cor_control_retrans, rbn);
298 BUG_ON(cr->nb != nb_retranslocked);
300 if (cor_seqno_before(seqno, cr->seqno))
301 n = n->rb_left;
302 else if (cor_seqno_after(seqno, cr->seqno))
303 n = n->rb_right;
304 else
305 ret = cr;
308 if (ret != 0)
309 kref_get(&(ret->ref));
311 return ret;
314 /* nb->retrans_lock must be held */
315 void cor_insert_control_retrans(struct cor_control_retrans *ins)
317 struct cor_neighbor *nb = ins->nb;
318 __u64 seqno = ins->seqno;
320 struct rb_root *root;
321 struct rb_node **p;
322 struct rb_node *parent = 0;
324 BUG_ON(nb == 0);
326 root = &(nb->kp_retransmits_rb);
327 p = &(root->rb_node);
329 while ((*p) != 0) {
330 struct cor_control_retrans *cr = container_of(*p,
331 struct cor_control_retrans, rbn);
333 BUG_ON(cr->nb != nb);
335 parent = *p;
336 if (unlikely(cor_seqno_eq(seqno, cr->seqno))) {
337 BUG();
338 } else if (cor_seqno_before(seqno, cr->seqno)) {
339 p = &(*p)->rb_left;
340 } else if (cor_seqno_after(seqno, cr->seqno)) {
341 p = &(*p)->rb_right;
342 } else {
343 BUG();
347 kref_get(&(ins->ref));
348 rb_link_node(&(ins->rbn), parent, p);
349 rb_insert_color(&(ins->rbn), root);
352 static void cor_remove_connack_oooflag_ifold(struct cor_conn *src_in_l,
353 struct cor_control_msg_out *cm)
355 if (cor_ooolen(cm->msg.ack_conn.flags) != 0 && cor_seqno_before_eq(
356 cm->msg.ack_conn.seqno_ooo +
357 cm->msg.ack_conn.length,
358 src_in_l->source.in.next_seqno)) {
359 cm->msg.ack_conn.length = 0;
360 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
361 (~KP_ACK_CONN_FLAGS_OOO));
365 static int cor_ackconn_prepare_requeue(struct cor_conn *cn_l,
366 struct cor_control_msg_out *cm)
368 if (unlikely(unlikely(cn_l->sourcetype != SOURCE_IN) ||
369 unlikely(cn_l->source.in.nb != cm->nb) ||
370 unlikely(cn_l->reversedir->target.out.conn_id !=
371 cm->msg.ack_conn.conn_id) ||
372 unlikely(cn_l->isreset != 0)))
373 return 0;
375 cor_remove_connack_oooflag_ifold(cn_l, cm);
377 if (!cor_seqno_eq(cm->msg.ack_conn.ack_seqno,
378 cn_l->source.in.ack_seqno))
379 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
380 (~KP_ACK_CONN_FLAGS_SEQNO) &
381 (~KP_ACK_CONN_FLAGS_WINDOW));
383 if (cm->msg.ack_conn.flags == 0)
384 return 0;
386 cm->length = 6 + cor_ack_conn_len(cm->msg.ack_conn.flags);
388 return 1;
391 static void cor_requeue_control_retrans(struct cor_control_retrans *cr)
393 atomic_inc(&(cr->nb->cmsg_bulk_readds));
395 while (list_empty(&(cr->msgs)) == 0) {
396 struct cor_control_msg_out *cm = container_of(cr->msgs.prev,
397 struct cor_control_msg_out, lh);
398 list_del(&(cm->lh));
400 BUG_ON(cm->nb != cr->nb);
402 if (cm->type == MSGTYPE_ACK_CONN) {
403 struct cor_conn *cn_l = cm->msg.ack_conn.src_in;
404 spin_lock_bh(&(cn_l->rcv_lock));
405 if (unlikely(cor_ackconn_prepare_requeue(cn_l,
406 cm) == 0)) {
407 cor_free_control_msg(cm);
408 } else {
409 cor_merge_or_enqueue_ackconn(cn_l, cm,
410 ADDCMSG_SRC_RETRANS);
413 spin_unlock_bh(&(cn_l->rcv_lock));
414 } else {
415 if (cm->type == MSGTYPE_PONG)
416 atomic_dec(&(cm->nb->cmsg_pongs_retrans_cnt));
417 cor_enqueue_control_msg(cm, ADDCMSG_SRC_RETRANS);
421 atomic_dec(&(cr->nb->cmsg_bulk_readds));
423 spin_lock_bh(&(cr->nb->cmsg_lock));
424 cor_schedule_controlmsg_timer(cr->nb);
425 spin_unlock_bh(&(cr->nb->cmsg_lock));
428 static void cor_empty_retrans_queue(struct cor_neighbor *nb_retranslocked)
430 while (!list_empty(&(nb_retranslocked->retrans_list))) {
431 struct cor_control_retrans *cr = container_of(
432 nb_retranslocked->retrans_list.next,
433 struct cor_control_retrans, timeout_list);
435 BUG_ON(cr->nb != nb_retranslocked);
437 list_del(&(cr->timeout_list));
438 rb_erase(&(cr->rbn), &(nb_retranslocked->kp_retransmits_rb));
440 kref_put(&(cr->ref), cor_kreffree_bug); /* rb */
441 kref_put(&(cr->ref), cor_free_control_retrans); /* list */
445 void cor_retransmit_timerfunc(struct timer_list *retrans_timer)
447 struct cor_neighbor *nb = container_of(retrans_timer,
448 struct cor_neighbor, retrans_timer);
449 int nbstate = cor_get_neigh_state(nb);
450 struct cor_control_retrans *cr = 0;
452 spin_lock_bh(&(nb->retrans_lock));
454 if (list_empty(&(nb->retrans_list))) {
455 spin_unlock_bh(&(nb->retrans_lock));
456 kref_put(&(nb->ref), cor_neighbor_free);
457 return;
460 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
461 cor_empty_retrans_queue(nb);
462 spin_unlock_bh(&(nb->retrans_lock));
463 kref_put(&(nb->ref), cor_neighbor_free);
464 return;
467 cr = container_of(nb->retrans_list.next, struct cor_control_retrans,
468 timeout_list);
470 BUG_ON(cr->nb != nb);
472 if (time_after(cr->timeout, jiffies)) {
473 int rc = mod_timer(&(nb->retrans_timer), cr->timeout);
474 spin_unlock_bh(&(nb->retrans_lock));
475 if (rc != 0)
476 kref_put(&(nb->ref), cor_neighbor_free);
477 return;
480 spin_unlock_bh(&(nb->retrans_lock));
482 spin_lock_bh(&(nb->cmsg_lock));
483 nb->add_retrans_needed = 1;
484 cor_schedule_controlmsg_timer(nb);
485 spin_unlock_bh(&(nb->cmsg_lock));
487 kref_put(&(nb->ref), cor_neighbor_free);
490 static void cor_schedule_retransmit(struct cor_control_retrans *cr,
491 struct cor_neighbor *nb)
493 int first;
495 cr->timeout = cor_calc_timeout(atomic_read(&(nb->latency_retrans_us)),
496 atomic_read(&(nb->latency_stddev_retrans_us)),
497 atomic_read(&(nb->max_remote_ack_delay_us)));
499 spin_lock_bh(&(nb->retrans_lock));
500 cor_insert_control_retrans(cr);
501 first = list_empty(&(nb->retrans_list));
502 list_add_tail(&(cr->timeout_list), &(nb->retrans_list));
504 if (first) {
505 if (mod_timer(&(nb->retrans_timer), cr->timeout) == 0) {
506 kref_get(&(nb->ref));
510 spin_unlock_bh(&(nb->retrans_lock));
513 void cor_kern_ack_rcvd(struct cor_neighbor *nb, __u64 seqno)
515 struct cor_control_retrans *cr = 0;
517 spin_lock_bh(&(nb->retrans_lock));
519 cr = cor_get_control_retrans(nb, seqno);
521 if (cr == 0) {
522 /* char *seqno_p = (char *) &seqno;
523 seqno = cpu_to_be32(seqno);
524 printk(KERN_ERR "bogus/duplicate ack received %d %d %d %d",
525 seqno_p[0], seqno_p[1], seqno_p[2], seqno_p[3]);
527 goto out;
530 rb_erase(&(cr->rbn), &(nb->kp_retransmits_rb));
532 BUG_ON(cr->nb != nb);
534 list_del(&(cr->timeout_list));
536 out:
537 spin_unlock_bh(&(nb->retrans_lock));
539 if (cr != 0) {
540 /* cor_get_control_retrans */
541 kref_put(&(cr->ref), cor_kreffree_bug);
543 kref_put(&(cr->ref), cor_kreffree_bug); /* rb_erase */
544 kref_put(&(cr->ref), cor_free_control_retrans); /* list */
548 static __u8 cor_get_window(struct cor_conn *cn,
549 struct cor_neighbor *expectedsender, __u32 expected_connid)
551 __u8 window = 0;
553 spin_lock_bh(&(cn->rcv_lock));
555 if (unlikely(unlikely(cn->sourcetype != SOURCE_IN) ||
556 unlikely(expectedsender != 0 && (cn->source.in.nb !=
557 expectedsender || cn->reversedir->target.out.conn_id !=
558 expected_connid))))
559 goto out;
561 window = cor_enc_log_64_7(cor_seqno_clean(
562 cn->source.in.window_seqnolimit -
563 cn->source.in.next_seqno));
565 cn->source.in.window_seqnolimit_remote = cn->source.in.next_seqno +
566 cor_dec_log_64_7(window);
568 out:
569 spin_unlock_bh(&(cn->rcv_lock));
571 return window;
574 /* static void padding(struct sk_buff *skb, __u32 length)
576 char *dst;
577 if (length <= 0)
578 return;
579 dst = skb_put(skb, length);
580 BUG_ON(dst == 0);
581 memset(dst, KP_PADDING, length);
582 } */
585 static __u32 cor_add_init_session(struct sk_buff *skb, __be32 sessionid,
586 __u32 spaceleft)
588 char *dst;
590 BUG_ON(KP_INIT_SESSION_CMDLEN != 5);
592 if (unlikely(spaceleft < 5))
593 return 0;
595 dst = skb_put(skb, 5);
596 BUG_ON(dst == 0);
598 dst[0] = KP_INIT_SESSION;
599 cor_put_be32(dst + 1, sessionid);
601 return 5;
604 static __u32 cor_add_ack(struct sk_buff *skb, struct cor_control_retrans *cr,
605 struct cor_control_msg_out *cm, __u32 spaceleft)
607 char *dst;
609 BUG_ON(cm->length != 7);
611 if (unlikely(spaceleft < 7))
612 return 0;
614 dst = skb_put(skb, 7);
615 BUG_ON(dst == 0);
617 dst[0] = KP_ACK;
618 cor_put_u48(dst + 1, cm->msg.ack.seqno);
620 cor_free_control_msg(cm);
622 return 7;
625 static __u32 cor_add_ack_conn(struct sk_buff *skb,
626 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
627 __u32 spaceleft)
629 char *dst;
630 __u32 offset = 0;
632 if (unlikely(spaceleft < cm->length))
633 return 0;
635 dst = skb_put(skb, cm->length);
636 BUG_ON(dst == 0);
638 dst[offset] = KP_ACK_CONN;
639 offset++;
640 cor_put_u32(dst + offset, cm->msg.ack_conn.conn_id);
641 offset += 4;
642 dst[offset] = cm->msg.ack_conn.flags;
643 offset++;
645 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) != 0) {
646 cor_put_u48(dst + offset, cm->msg.ack_conn.seqno);
647 offset += 6;
649 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_WINDOW) != 0) {
650 BUG_ON(cm->msg.ack_conn.src_in == 0);
651 dst[offset] = cor_get_window(cm->msg.ack_conn.src_in,
652 cm->nb, cm->msg.ack_conn.conn_id);
653 offset++;
657 if (cor_ooolen(cm->msg.ack_conn.flags) != 0) {
658 cor_put_u48(dst + offset, cm->msg.ack_conn.seqno_ooo);
659 offset += 6;
660 if (cor_ooolen(cm->msg.ack_conn.flags) == 1) {
661 BUG_ON(cm->msg.ack_conn.length > 255);
662 dst[offset] = cm->msg.ack_conn.length;
663 offset += 1;
664 } else if (cor_ooolen(cm->msg.ack_conn.flags) == 2) {
665 BUG_ON(cm->msg.ack_conn.length <= 255);
666 BUG_ON(cm->msg.ack_conn.length > 65535);
667 cor_put_u16(dst + offset, cm->msg.ack_conn.length);
668 offset += 2;
669 } else if (cor_ooolen(cm->msg.ack_conn.flags) == 4) {
670 BUG_ON(cm->msg.ack_conn.length <= 65535);
671 cor_put_u32(dst + offset, cm->msg.ack_conn.length);
672 offset += 4;
673 } else {
674 BUG();
678 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) != 0) {
679 dst[offset] = cm->msg.ack_conn.priority_seqno;
680 offset++;
681 dst[offset] = cm->msg.ack_conn.priority;
682 offset++;
685 list_add_tail(&(cm->lh), &(cr->msgs));
687 BUG_ON(offset != cm->length);
688 return offset;
691 static __u32 cor_add_ping(struct sk_buff *skb, __u32 cookie, __u32 spaceleft)
693 char *dst;
695 BUG_ON(KP_PING_CMDLEN != 5);
697 if (unlikely(spaceleft < 5))
698 return 0;
700 dst = skb_put(skb, 5);
701 BUG_ON(dst == 0);
703 dst[0] = KP_PING;
704 cor_put_u32(dst + 1, cookie);
706 return 5;
709 static __u32 cor_calc_respdelay(ktime_t time_pong_enqueued, ktime_t time_end)
711 if (unlikely(ktime_before(time_end, time_pong_enqueued))) {
712 return 0;
713 } else {
714 __s64 respdelay = div_u64(ktime_to_ns(time_end) -
715 ktime_to_ns(time_pong_enqueued) + 500,
716 1000);
718 if (unlikely(respdelay > U32_MAX))
719 return U32_MAX;
720 else if (unlikely(respdelay < 0))
721 return 0;
722 else
723 return (__u32) respdelay;
727 static __u32 cor_add_pong(struct sk_buff *skb, struct cor_control_retrans *cr,
728 struct cor_control_msg_out *cm, __u32 spaceleft,
729 ktime_t cmsg_send_start)
731 __u32 respdelay_full;
732 __u32 respdelay_netonly;
733 char *dst;
735 BUG_ON(cm->length != 13);
737 if (unlikely(spaceleft < 13))
738 return 0;
740 respdelay_full = cor_calc_respdelay(cm->msg.pong.time_enqueued,
741 cmsg_send_start);
742 respdelay_netonly = cor_calc_respdelay(cm->msg.pong.ping_rcvtime,
743 ktime_get());
745 dst = skb_put(skb, 13);
746 BUG_ON(dst == 0);
748 dst[0] = KP_PONG;
749 cor_put_u32(dst + 1, cm->msg.pong.cookie);
750 cor_put_u32(dst + 5, (__u32) respdelay_full);
751 cor_put_u32(dst + 9, (__u32) respdelay_netonly);
753 list_add_tail(&(cm->lh), &(cr->msgs));
755 return 13;
758 static __u32 cor_add_connect(struct sk_buff *skb,
759 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
760 __u32 spaceleft)
762 char *dst;
763 struct cor_conn *src_in = cm->msg.connect.src_in;
765 BUG_ON(cm->length != 21);
767 if (unlikely(spaceleft < 21))
768 return 0;
770 dst = skb_put(skb, 21);
771 BUG_ON(dst == 0);
773 dst[0] = KP_CONNECT;
774 cor_put_u32(dst + 1, cm->msg.connect.conn_id);
775 cor_put_u48(dst + 5, cm->msg.connect.seqno1);
776 cor_put_u48(dst + 11, cm->msg.connect.seqno2);
777 BUG_ON(cm->msg.connect.src_in == 0);
778 dst[17] = cor_get_window(cm->msg.connect.src_in, cm->nb,
779 cm->msg.connect.conn_id);
781 spin_lock_bh(&(src_in->reversedir->rcv_lock));
782 BUG_ON(src_in->reversedir->targettype != TARGET_OUT);
784 dst[18] = src_in->reversedir->target.out.priority_seqno;
785 dst[19] = src_in->reversedir->target.out.priority_last;
786 if (src_in->is_highlatency == 0)
787 dst[20] = 0;
788 else
789 dst[20] = 1;
791 spin_unlock_bh(&(src_in->reversedir->rcv_lock));
793 list_add_tail(&(cm->lh), &(cr->msgs));
795 return 21;
798 static __u32 cor_add_connect_success(struct sk_buff *skb,
799 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
800 __u32 spaceleft)
802 char *dst;
804 BUG_ON(cm->length != 6);
806 if (unlikely(spaceleft < 6))
807 return 0;
809 dst = skb_put(skb, 6);
810 BUG_ON(dst == 0);
812 dst[0] = KP_CONNECT_SUCCESS;
813 cor_put_u32(dst + 1, cm->msg.connect_success.conn_id);
814 BUG_ON(cm->msg.connect_success.src_in == 0);
815 dst[5] = cor_get_window(cm->msg.connect_success.src_in, cm->nb,
816 cm->msg.connect_success.conn_id);
818 list_add_tail(&(cm->lh), &(cr->msgs));
820 return 6;
823 static __u32 cor_add_reset_conn(struct sk_buff *skb,
824 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
825 __u32 spaceleft)
827 char *dst;
829 BUG_ON(cm->length != 5);
831 if (unlikely(spaceleft < 5))
832 return 0;
834 dst = skb_put(skb, 5);
835 BUG_ON(dst == 0);
837 dst[0] = KP_RESET_CONN;
838 cor_put_u32(dst + 1, cm->msg.reset_conn.conn_id);
840 list_add_tail(&(cm->lh), &(cr->msgs));
842 return 5;
845 static __u32 cor_add_conndata(struct sk_buff *skb,
846 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
847 __u32 spaceleft, struct cor_control_msg_out **split_conndata,
848 __u32 *sc_sendlen)
850 char *dst;
852 __u32 totallen = cm->msg.conn_data.datalen + KP_CONN_DATA_CMDLEN;
853 __u32 putlen = min(totallen, spaceleft);
854 __u32 dataputlen = putlen - KP_CONN_DATA_CMDLEN;
856 BUG_ON(KP_CONN_DATA_CMDLEN != 13);
857 BUG_ON(cm->length != totallen);
859 BUG_ON(putlen > 1024*1024*1024);
861 BUG_ON(split_conndata == 0);
862 BUG_ON(*split_conndata != 0);
863 BUG_ON(sc_sendlen == 0);
864 BUG_ON(*sc_sendlen != 0);
866 if (putlen < KP_CONN_DATA_CMDLEN + 1)
867 return 0;
869 dst = skb_put(skb, putlen);
870 BUG_ON(dst == 0);
872 if (cm->msg.conn_data.flush != 0) {
873 if (cm->msg.conn_data.snd_delayed_lowbuf == 0) {
874 dst[0] = KP_CONN_DATA_FLUSH;
875 } else {
876 dst[0] = KP_CONN_DATA_LOWBUFDELAYED_FLUSH;
878 } else {
879 if (cm->msg.conn_data.snd_delayed_lowbuf == 0) {
880 dst[0] = KP_CONN_DATA;
881 } else {
882 dst[0] = KP_CONN_DATA_LOWBUFDELAYED;
885 cor_put_u32(dst + 1, cm->msg.conn_data.conn_id);
886 cor_put_u48(dst + 5, cm->msg.conn_data.seqno);
887 cor_put_u16(dst + 11, dataputlen);
889 memcpy(dst + 13, cm->msg.conn_data.data, dataputlen);
891 if (cm->msg.conn_data.datalen == dataputlen) {
892 BUG_ON(cm->length != putlen);
893 list_add_tail(&(cm->lh), &(cr->msgs));
894 } else {
895 *split_conndata = cm;
896 *sc_sendlen = dataputlen;
899 return putlen;
902 static __u32 cor_add_set_max_cmsg_dly(struct sk_buff *skb,
903 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
904 __u32 spaceleft)
906 char *dst;
908 BUG_ON(KP_SET_MAX_CMSG_DELAY_CMDLEN != 13);
909 BUG_ON(cm->length != KP_SET_MAX_CMSG_DELAY_CMDLEN);
911 if (unlikely(spaceleft < 13))
912 return 0;
914 dst = skb_put(skb, 13);
915 BUG_ON(dst == 0);
917 dst[0] = KP_SET_MAX_CMSG_DELAY;
918 cor_put_u32(dst + 1, cm->msg.set_max_cmsg_delay.ack_delay);
919 cor_put_u32(dst + 5, cm->msg.set_max_cmsg_delay.ackconn_delay);
920 cor_put_u32(dst + 9, cm->msg.set_max_cmsg_delay.other_delay);
922 list_add_tail(&(cm->lh), &(cr->msgs));
924 return 13;
927 static __u32 cor_add_message(struct sk_buff *skb,
928 struct cor_control_retrans *cr, struct cor_control_msg_out *cm,
929 __u32 spaceleft, ktime_t cmsg_send_start,
930 struct cor_control_msg_out **split_conndata, __u32 *sc_sendlen)
932 BUG_ON(split_conndata != 0 && *split_conndata != 0);
933 BUG_ON(sc_sendlen != 0 && *sc_sendlen != 0);
935 switch (cm->type) {
936 case MSGTYPE_ACK:
937 return cor_add_ack(skb, cr, cm, spaceleft);
938 case MSGTYPE_ACK_CONN:
939 return cor_add_ack_conn(skb, cr, cm, spaceleft);
940 case MSGTYPE_PONG:
941 return cor_add_pong(skb, cr, cm, spaceleft, cmsg_send_start);
942 case MSGTYPE_CONNECT:
943 return cor_add_connect(skb, cr, cm, spaceleft);
944 case MSGTYPE_CONNECT_SUCCESS:
945 return cor_add_connect_success(skb, cr, cm, spaceleft);
946 case MSGTYPE_RESET_CONN:
947 return cor_add_reset_conn(skb, cr, cm, spaceleft);
948 case MSGTYPE_CONNDATA:
949 return cor_add_conndata(skb, cr, cm, spaceleft, split_conndata,
950 sc_sendlen);
951 case MSGTYPE_SET_MAX_CMSG_DELAY:
952 return cor_add_set_max_cmsg_dly(skb, cr, cm, spaceleft);
953 default:
954 BUG();
956 BUG();
957 return 0;
960 static __u32 __cor_send_messages(struct cor_neighbor *nb, struct sk_buff *skb,
961 struct cor_control_retrans *cr, struct list_head *cmsgs,
962 __u32 spaceleft, int nbstate, ktime_t cmsg_send_start,
963 struct cor_control_msg_out **split_conndata, __u32 *sc_sendlen)
965 __u32 length = 0;
966 while (!list_empty(cmsgs)) {
967 __u32 rc;
968 struct cor_control_msg_out *cm = container_of(cmsgs->next,
969 struct cor_control_msg_out, lh);
971 list_del(&(cm->lh));
973 rc = cor_add_message(skb, cr, cm, spaceleft - length,
974 cmsg_send_start, split_conndata, sc_sendlen);
975 if (rc == 0) {
976 BUG();
977 list_add(&(cm->lh), cmsgs);
978 break;
980 BUG_ON(rc != cm->length && cm->type != MSGTYPE_CONNDATA);
982 length += rc;
985 return length;
988 static __u32 __cor_send_messages_smcd(struct cor_neighbor *nb,
989 struct sk_buff *skb, struct cor_control_retrans *cr,
990 __u32 spaceleft, ktime_t cmsg_send_start)
992 struct cor_control_msg_out *cm;
993 __u32 rc;
995 cm = cor_alloc_control_msg(nb, ACM_PRIORITY_MED);
997 if (unlikely(cm == 0))
998 return 0;
1000 cm->type = MSGTYPE_SET_MAX_CMSG_DELAY;
1001 cm->msg.set_max_cmsg_delay.ack_delay =
1002 CMSG_MAXDELAY_ACK_MS * 1000;
1003 cm->msg.set_max_cmsg_delay.ackconn_delay =
1004 CMSG_MAXDELAY_ACKCONN_MS * 1000;
1005 cm->msg.set_max_cmsg_delay.other_delay =
1006 CMSG_MAXDELAY_OTHER_MS * 1000;
1007 cm->length = KP_SET_MAX_CMSG_DELAY_CMDLEN;
1009 rc = cor_add_message(skb, cr, cm, spaceleft, cmsg_send_start, 0, 0);
1011 nb->max_cmsg_delay_sent = 1;
1013 return rc;
1016 static void cor_requeue_message(struct cor_control_msg_out *cm)
1018 if (cm->type == MSGTYPE_ACK_CONN) {
1019 struct cor_conn *cn_l = cm->msg.ack_conn.src_in;
1021 spin_lock_bh(&(cn_l->rcv_lock));
1022 if (unlikely(cor_ackconn_prepare_requeue(cn_l, cm) == 0)) {
1023 cor_free_control_msg(cm);
1024 } else {
1025 spin_lock_bh(&(cm->nb->cmsg_lock));
1027 list_add(&(cm->lh), &(cm->nb->cmsg_queue_ackconn));
1028 cm->nb->cmsg_otherlength += cm->length;
1030 list_add(&(cm->msg.ack_conn.conn_acks),
1031 &(cn_l->source.in.acks_pending));
1032 cor_try_merge_ackconns(cn_l, cm);
1034 spin_unlock_bh(&(cm->nb->cmsg_lock));
1036 spin_unlock_bh(&(cn_l->rcv_lock));
1037 return;
1040 cor_enqueue_control_msg(cm, ADDCMSG_SRC_READD);
1043 static void cor_requeue_messages(struct list_head *lh)
1045 while (list_empty(lh) == 0) {
1046 struct cor_control_msg_out *cm = container_of(lh->prev,
1047 struct cor_control_msg_out, lh);
1048 list_del(&(cm->lh));
1049 cor_requeue_message(cm);
1053 static int _cor_send_messages_send2(struct cor_neighbor *nb,
1054 struct sk_buff *skb, int ping, int initsession,
1055 struct cor_control_retrans *cr, struct list_head *cmsgs,
1056 __u32 spaceleft, int nbstate, ktime_t cmsg_send_start,
1057 int *sent)
1059 int rc;
1060 __u32 length = 0;
1061 __u32 pinglen = 0;
1062 __u32 pingcookie = 0;
1063 unsigned long last_ping_time;
1064 struct cor_control_msg_out *split_conndata = 0;
1065 __u32 sc_sendlen = 0;
1067 if (ping != TIMETOSENDPING_NO) {
1068 __u32 rc;
1070 if (unlikely(initsession)) {
1071 rc = cor_add_init_session(skb, nb->sessionid,
1072 spaceleft - length);
1073 BUG_ON(rc <= 0);
1074 pinglen = rc;
1075 length += rc;
1078 pingcookie = cor_add_ping_req(nb, &last_ping_time);
1079 rc = cor_add_ping(skb, pingcookie, spaceleft - length);
1080 BUG_ON(rc <= 0);
1081 pinglen += rc;
1082 length += rc;
1085 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE) &&
1086 unlikely(nb->max_cmsg_delay_sent == 0))
1087 length += __cor_send_messages_smcd(nb, skb, cr,
1088 spaceleft - length, cmsg_send_start);
1090 length += __cor_send_messages(nb, skb, cr, cmsgs, spaceleft - length,
1091 nbstate, cmsg_send_start, &split_conndata, &sc_sendlen);
1093 BUG_ON(length > spaceleft);
1095 if (likely(ping != TIMETOSENDPING_FORCE) &&
1096 pinglen != 0 && unlikely(length == pinglen)) {
1097 cor_unadd_ping_req(nb, pingcookie, last_ping_time, 0);
1098 goto drop;
1101 if (unlikely(length == 0)) {
1102 drop:
1103 kfree_skb(skb);
1105 BUG_ON(list_empty(&(cr->msgs)) == 0);
1106 kref_put(&(cr->ref), cor_free_control_retrans);
1108 nb->kpacket_seqno--;
1109 return QOS_RESUME_DONE;
1112 //padding(skb, spaceleft - length);
1113 BUG_ON(spaceleft - length != 0);
1115 rc = cor_dev_queue_xmit(skb, nb->queue, QOS_CALLER_KPACKET);
1116 if (rc == NET_XMIT_SUCCESS)
1117 *sent = 1;
1119 if (rc == NET_XMIT_DROP) {
1120 if (ping != 0)
1121 cor_unadd_ping_req(nb, pingcookie, last_ping_time, 1);
1123 atomic_inc(&(nb->cmsg_bulk_readds));
1124 if (split_conndata != 0)
1125 cor_requeue_message(split_conndata);
1127 cor_requeue_messages(&(cr->msgs));
1129 kref_put(&(cr->ref), cor_free_control_retrans);
1131 atomic_dec(&(nb->cmsg_bulk_readds));
1133 spin_lock_bh(&(nb->cmsg_lock));
1134 cor_schedule_controlmsg_timer(nb);
1135 spin_unlock_bh(&(nb->cmsg_lock));
1136 } else {
1137 struct list_head *curr = cr->msgs.next;
1139 if (pingcookie != 0)
1140 cor_ping_sent(nb, pingcookie);
1142 while (curr != &(cr->msgs)) {
1143 struct cor_control_msg_out *cm = container_of(curr,
1144 struct cor_control_msg_out, lh);
1146 curr = curr->next;
1148 if (unlikely(cm->type == MSGTYPE_PONG &&
1149 (nbstate != NEIGHBOR_STATE_ACTIVE))) {
1150 list_del(&(cm->lh));
1151 cor_free_control_msg(cm);
1152 } else if (unlikely(cm->type == MSGTYPE_PONG &&
1153 atomic_inc_return(
1154 &(nb->cmsg_pongs_retrans_cnt)) >
1155 MAX_PONG_CMSGS_RETRANS_PER_NEIGH)) {
1156 atomic_dec(&(nb->cmsg_pongs_retrans_cnt));
1157 list_del(&(cm->lh));
1158 cor_free_control_msg(cm);
1159 } else if (cm->type == MSGTYPE_CONNDATA) {
1160 cor_schedule_retransmit_conn(
1161 cm->msg.conn_data.cr, 0, 0);
1162 kfree(cm->msg.conn_data.data_orig);
1163 list_del(&(cm->lh));
1164 cor_free_control_msg(cm);
1168 if (split_conndata != 0) {
1169 BUG_ON(sc_sendlen == 0);
1170 BUG_ON(sc_sendlen >=
1171 split_conndata->msg.conn_data.datalen);
1173 split_conndata->msg.conn_data.seqno += sc_sendlen;
1174 split_conndata->msg.conn_data.data += sc_sendlen;
1175 split_conndata->msg.conn_data.datalen -= sc_sendlen;
1176 split_conndata->length = KP_CONN_DATA_CMDLEN +
1177 split_conndata->msg.conn_data.datalen;
1178 cor_enqueue_control_msg(split_conndata,
1179 ADDCMSG_SRC_SPLITCONNDATA);
1183 if (list_empty(&(cr->msgs)))
1184 kref_put(&(cr->ref), cor_free_control_retrans);
1185 else
1186 cor_schedule_retransmit(cr, nb);
1189 return (rc == NET_XMIT_SUCCESS) ? QOS_RESUME_DONE : QOS_RESUME_CONG;
1192 static int _cor_send_messages_send(struct cor_neighbor *nb, int ping,
1193 int initsession, struct list_head *cmsgs, int nbstate,
1194 __u32 length, __u64 seqno, ktime_t cmsg_send_start, int *sent)
1196 struct sk_buff *skb;
1197 struct cor_control_retrans *cr;
1198 int rc;
1200 skb = cor_create_packet_cmsg(nb, length, GFP_ATOMIC, seqno);
1201 if (unlikely(skb == 0)) {
1202 printk(KERN_ERR "cor_send_messages(): cannot allocate skb (out of memory?)");
1204 cor_requeue_messages(cmsgs);
1205 return QOS_RESUME_CONG;
1208 cr = kmem_cache_alloc(cor_controlretrans_slab, GFP_ATOMIC);
1209 if (unlikely(cr == 0)) {
1210 printk(KERN_ERR "cor_send_messages(): cannot allocate cor_control_retrans (out of memory?)");
1211 kfree_skb(skb);
1213 cor_requeue_messages(cmsgs);
1214 return QOS_RESUME_CONG;
1217 memset(cr, 0, sizeof(struct cor_control_retrans));
1218 kref_init(&(cr->ref));
1219 cr->nb = nb;
1220 cr->seqno = seqno;
1221 INIT_LIST_HEAD(&(cr->msgs));
1223 rc = _cor_send_messages_send2(nb, skb, ping, initsession, cr, cmsgs,
1224 length, nbstate, cmsg_send_start, sent);
1226 BUG_ON(!list_empty(cmsgs));
1228 return rc;
1231 #define CMSGQUEUE_PONG 1
1232 #define CMSGQUEUE_ACK 2
1233 #define CMSGQUEUE_ACK_CONN 3
1234 #define CMSGQUEUE_CONNDATA_LOWLAT 4
1235 #define CMSGQUEUE_CONNDATA_HIGHLAT 5
1236 #define CMSGQUEUE_OTHER 6
1238 static unsigned long cor_get_cmsg_timeout(struct cor_control_msg_out *cm,
1239 int queue)
1241 if (cm->type == MSGTYPE_ACK) {
1242 BUG_ON(queue != CMSGQUEUE_ACK);
1243 return cm->time_added +
1244 msecs_to_jiffies(CMSG_MAXDELAY_ACK_MS) - 1;
1245 } else if (cm->type == MSGTYPE_ACK_CONN) {
1246 BUG_ON(queue != CMSGQUEUE_ACK_CONN);
1247 return cm->time_added +
1248 msecs_to_jiffies(CMSG_MAXDELAY_ACKCONN_MS) - 1;
1249 } else if (cm->type == MSGTYPE_CONNDATA) {
1250 if (cm->msg.conn_data.highlatency != 0) {
1251 BUG_ON(queue != CMSGQUEUE_CONNDATA_HIGHLAT);
1252 return cm->time_added +
1253 msecs_to_jiffies(
1254 CMSG_MAXDELAY_CONNDATA_MS) - 1;
1255 } else {
1256 BUG_ON(queue != CMSGQUEUE_CONNDATA_LOWLAT);
1257 return cm->time_added;
1259 } else {
1260 BUG_ON(cm->type == MSGTYPE_PONG && queue != CMSGQUEUE_PONG);
1261 BUG_ON(cm->type != MSGTYPE_PONG && queue != CMSGQUEUE_OTHER);
1263 return cm->time_added +
1264 msecs_to_jiffies(CMSG_MAXDELAY_OTHER_MS) - 1;
1268 static void _cor_peek_message(struct cor_neighbor *nb_cmsglocked, int queue,
1269 struct cor_control_msg_out **currcm, unsigned long *currtimeout,
1270 __u32 **currlen)
1272 struct cor_control_msg_out *cm;
1273 unsigned long cmtimeout;
1275 struct list_head *queuelh;
1276 if (queue == CMSGQUEUE_PONG) {
1277 queuelh = &(nb_cmsglocked->cmsg_queue_pong);
1278 } else if (queue == CMSGQUEUE_ACK) {
1279 queuelh = &(nb_cmsglocked->cmsg_queue_ack);
1280 } else if (queue == CMSGQUEUE_ACK_CONN) {
1281 queuelh = &(nb_cmsglocked->cmsg_queue_ackconn);
1282 } else if (queue == CMSGQUEUE_CONNDATA_LOWLAT) {
1283 queuelh = &(nb_cmsglocked->cmsg_queue_conndata_lowlat);
1284 } else if (queue == CMSGQUEUE_CONNDATA_HIGHLAT) {
1285 queuelh = &(nb_cmsglocked->cmsg_queue_conndata_highlat);
1286 } else if (queue == CMSGQUEUE_OTHER) {
1287 queuelh = &(nb_cmsglocked->cmsg_queue_other);
1288 } else {
1289 BUG();
1292 if (list_empty(queuelh))
1293 return;
1295 cm = container_of(queuelh->next, struct cor_control_msg_out, lh);
1296 cmtimeout = cor_get_cmsg_timeout(cm, queue);
1298 BUG_ON(cm->nb != nb_cmsglocked);
1300 if (*currcm == 0 || (time_before(cmtimeout, *currtimeout) &&
1301 time_before(jiffies, *currtimeout))) {
1302 *currcm = cm;
1303 *currtimeout = cmtimeout;
1305 if (queue == CMSGQUEUE_PONG) {
1306 *currlen = &(nb_cmsglocked->cmsg_pongslength);
1307 } else {
1308 *currlen = &(nb_cmsglocked->cmsg_otherlength);
1313 static void cor_peek_message(struct cor_neighbor *nb_cmsglocked, int nbstate,
1314 struct cor_control_msg_out **cm, unsigned long *cmtimeout,
1315 __u32 **len, int for_timeout)
1317 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_PONG, cm, cmtimeout, len);
1318 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE)) {
1319 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK, cm, cmtimeout,
1320 len);
1321 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_ACK_CONN, cm,
1322 cmtimeout,
1323 len);
1324 if (!for_timeout || atomic_read(
1325 &(nb_cmsglocked->cmsg_delay_conndata)) == 0) {
1326 _cor_peek_message(nb_cmsglocked,
1327 CMSGQUEUE_CONNDATA_LOWLAT,
1328 cm, cmtimeout, len);
1329 _cor_peek_message(nb_cmsglocked,
1330 CMSGQUEUE_CONNDATA_HIGHLAT,
1331 cm, cmtimeout, len);
1333 _cor_peek_message(nb_cmsglocked, CMSGQUEUE_OTHER, cm, cmtimeout,
1334 len);
1338 static unsigned long cor_get_cmsg_timer_timeout(
1339 struct cor_neighbor *nb_cmsglocked, int nbstate)
1341 unsigned long pingtimeout = cor_get_next_ping_time(nb_cmsglocked);
1343 struct cor_control_msg_out *cm = 0;
1344 unsigned long cmtimeout;
1345 __u32 *len;
1347 cor_peek_message(nb_cmsglocked, nbstate, &cm, &cmtimeout, &len, 1);
1349 if (cm != 0) {
1350 unsigned long jiffies_tmp = jiffies;
1352 if (time_before(cmtimeout, jiffies_tmp))
1353 return jiffies_tmp;
1354 if (time_before(cmtimeout, pingtimeout))
1355 return cmtimeout;
1358 return pingtimeout;
1361 static void _cor_dequeue_messages(struct cor_neighbor *nb_cmsglocked,
1362 int nbstate, __u32 targetmss, __u32 *length,
1363 struct list_head *cmsgs)
1365 while (1) {
1366 __u32 spaceleft = targetmss - *length;
1367 struct cor_control_msg_out *cm = 0;
1368 unsigned long cmtimeout;
1369 __u32 *len;
1371 cor_peek_message(nb_cmsglocked, nbstate, &cm, &cmtimeout, &len,
1374 if (unlikely(cm == 0))
1375 break;
1377 BUG_ON(len == 0);
1379 if (cm->length > spaceleft) {
1380 if (cm->type == MSGTYPE_CONNDATA) {
1381 BUG_ON(*length == 0 && spaceleft <
1382 KP_CONN_DATA_CMDLEN + 1);
1384 if (spaceleft < KP_CONN_DATA_CMDLEN + 1 ||
1385 *length > (targetmss/4)*3)
1386 break;
1387 } else {
1388 BUG_ON(*length == 0);
1389 break;
1393 list_del(&(cm->lh));
1394 *len -= cm->length;
1396 if (cm->type == MSGTYPE_ACK_CONN)
1397 list_del(&(cm->msg.ack_conn.conn_acks));
1398 if (unlikely(cm->type == MSGTYPE_PONG)) {
1399 BUG_ON(cm->nb->cmsg_pongscnt == 0);
1400 cm->nb->cmsg_pongscnt--;
1403 if (unlikely(cm->type == MSGTYPE_RESET_CONN)) {
1404 BUG_ON(cm->msg.reset_conn.in_pending_conn_resets == 0);
1405 rb_erase(&(cm->msg.reset_conn.rbn),
1406 &(cm->nb->pending_conn_resets_rb));
1407 cm->msg.reset_conn.in_pending_conn_resets = 0;
1408 kref_put(&(cm->ref), cor_kreffree_bug);
1411 BUG_ON(*length + cm->length < *length);
1412 if (cm->length > targetmss - *length) {
1413 BUG_ON(*length >= targetmss);
1414 BUG_ON(cm->type != MSGTYPE_CONNDATA);
1415 *length = targetmss;
1416 } else {
1417 *length += cm->length;
1420 list_add_tail(&(cm->lh), cmsgs);
1424 static __u32 cor_get_total_messages_length(struct cor_neighbor *nb, int ping,
1425 int initsession, int nbstate, int *extralength)
1427 __u32 length = nb->cmsg_pongslength;
1429 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE)) {
1430 length += nb->cmsg_otherlength;
1432 if (unlikely(nb->max_cmsg_delay_sent == 0)) {
1433 length += KP_SET_MAX_CMSG_DELAY_CMDLEN;
1434 *extralength += KP_SET_MAX_CMSG_DELAY_CMDLEN;
1437 if (ping == TIMETOSENDPING_FORCE ||
1438 (length > 0 && ping != TIMETOSENDPING_NO)) {
1439 length += KP_PING_CMDLEN;
1440 *extralength += KP_PING_CMDLEN;
1442 if (unlikely(initsession)) {
1443 length += KP_INIT_SESSION_CMDLEN;
1444 *extralength += KP_INIT_SESSION_CMDLEN;
1448 return length;
1451 static int cor_dequeue_messages(struct cor_neighbor *nb_cmsglocked, int ping,
1452 int initsession, int nbstate, __u32 targetmss,
1453 __u32 *length, struct list_head *cmsgs)
1455 __u32 extralength = 0;
1456 __u32 totallength;
1458 int cmsgqueue_nonpong_empty = (
1459 list_empty(&(nb_cmsglocked->cmsg_queue_ack)) &&
1460 list_empty(&(nb_cmsglocked->cmsg_queue_ackconn)) &&
1461 list_empty(&(nb_cmsglocked->cmsg_queue_conndata_lowlat)) &&
1462 list_empty(&(nb_cmsglocked->cmsg_queue_conndata_highlat)) &&
1463 list_empty(&(nb_cmsglocked->cmsg_queue_other)));
1465 BUG_ON(list_empty(&(nb_cmsglocked->cmsg_queue_pong)) &&
1466 nb_cmsglocked->cmsg_pongslength != 0);
1467 BUG_ON(!list_empty(&(nb_cmsglocked->cmsg_queue_pong)) &&
1468 nb_cmsglocked->cmsg_pongslength == 0);
1469 BUG_ON(cmsgqueue_nonpong_empty &&
1470 nb_cmsglocked->cmsg_otherlength != 0);
1471 BUG_ON(!cmsgqueue_nonpong_empty &&
1472 nb_cmsglocked->cmsg_otherlength == 0);
1474 totallength = cor_get_total_messages_length(nb_cmsglocked, ping,
1475 initsession, nbstate, &extralength);
1477 if (totallength == 0)
1478 return 1;
1480 if (totallength < targetmss && ping != TIMETOSENDPING_FORCE &&
1481 time_after(cor_get_cmsg_timer_timeout(nb_cmsglocked,
1482 nbstate), jiffies))
1483 return 1;
1485 *length = extralength;
1487 _cor_dequeue_messages(nb_cmsglocked, nbstate, targetmss, length, cmsgs);
1489 BUG_ON(*length == 0);
1490 BUG_ON(*length > targetmss);
1492 return 0;
1495 static void cor_add_timeouted_retrans(struct cor_neighbor *nb)
1497 spin_lock_bh(&(nb->retrans_lock));
1499 while (!list_empty(&(nb->retrans_list))) {
1500 struct cor_control_retrans *cr = container_of(
1501 nb->retrans_list.next,
1502 struct cor_control_retrans, timeout_list);
1504 BUG_ON(cr->nb != nb);
1506 if (time_after(cr->timeout, jiffies)) {
1507 if (mod_timer(&(nb->retrans_timer), cr->timeout) == 0) {
1508 kref_get(&(nb->ref));
1510 break;
1513 list_del(&(cr->timeout_list));
1514 rb_erase(&(cr->rbn), &(nb->kp_retransmits_rb));
1516 kref_put(&(cr->ref), cor_kreffree_bug); /* rb */
1518 cor_requeue_control_retrans(cr);
1521 spin_unlock_bh(&(nb->retrans_lock));
1524 static void _cor_delete_all_cmsgs(struct list_head *cmsgs)
1526 while (!list_empty(cmsgs)) {
1527 struct cor_control_msg_out *cm = container_of(cmsgs->next,
1528 struct cor_control_msg_out, lh);
1530 list_del(&(cm->lh));
1532 if (cm->type == MSGTYPE_CONNDATA) {
1533 cor_schedule_retransmit_conn(cm->msg.conn_data.cr, 0,
1535 kfree(cm->msg.conn_data.data_orig);
1538 cor_free_control_msg(cm);
1542 static void cor_delete_all_cmsgs(struct cor_neighbor *nb)
1544 while (1) {
1545 struct list_head cmsgs;
1546 __u32 length = 0;
1548 INIT_LIST_HEAD(&cmsgs);
1550 spin_lock_bh(&(nb->cmsg_lock));
1551 _cor_dequeue_messages(nb, NEIGHBOR_STATE_ACTIVE, 65536, &length,
1552 &cmsgs);
1553 spin_unlock_bh(&(nb->cmsg_lock));
1555 if (list_empty(&cmsgs))
1556 break;
1558 _cor_delete_all_cmsgs(&cmsgs);
1562 static int cor_reset_timeouted_conn_needed(struct cor_neighbor *nb,
1563 struct cor_conn *src_in_l)
1565 if (unlikely(src_in_l->sourcetype != SOURCE_IN ||
1566 src_in_l->source.in.nb != nb ||
1567 src_in_l->isreset != 0))
1568 return 0;
1569 else if (likely(time_after(src_in_l->source.in.jiffies_last_act +
1570 CONN_ACTIVITY_UPDATEINTERVAL_SEC * HZ +
1571 CONN_INACTIVITY_TIMEOUT_SEC * HZ, jiffies)))
1572 return 0;
1574 return 1;
1577 static int cor_reset_timeouted_conn(struct cor_neighbor *nb,
1578 struct cor_conn *src_in)
1580 int resetted = 0;
1582 if (src_in->is_client) {
1583 spin_lock_bh(&(src_in->rcv_lock));
1584 spin_lock_bh(&(src_in->reversedir->rcv_lock));
1585 } else {
1586 spin_lock_bh(&(src_in->reversedir->rcv_lock));
1587 spin_lock_bh(&(src_in->rcv_lock));
1590 resetted = cor_reset_timeouted_conn_needed(nb, src_in);
1591 if (unlikely(resetted == 0))
1592 goto unlock;
1594 resetted = (cor_send_reset_conn(nb,
1595 src_in->reversedir->target.out.conn_id, 1) == 0);
1596 if (unlikely(resetted == 0))
1597 goto unlock;
1600 BUG_ON(src_in->reversedir->isreset != 0);
1601 src_in->reversedir->isreset = 1;
1603 unlock:
1604 if (src_in->is_client) {
1605 spin_unlock_bh(&(src_in->rcv_lock));
1606 spin_unlock_bh(&(src_in->reversedir->rcv_lock));
1607 } else {
1608 spin_unlock_bh(&(src_in->reversedir->rcv_lock));
1609 spin_unlock_bh(&(src_in->rcv_lock));
1612 if (resetted)
1613 cor_reset_conn(src_in);
1615 return resetted;
1618 static void cor_reset_timeouted_conns(struct cor_neighbor *nb)
1620 int i;
1621 for (i=0;i<10000;i++) {
1622 unsigned long iflags;
1623 struct list_head *lh;
1624 struct cor_conn *src_in;
1626 int resetted = 1;
1628 spin_lock_irqsave(&(nb->conn_list_lock), iflags);
1630 if (list_empty(&(nb->rcv_conn_list))) {
1631 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
1632 break;
1635 lh = nb->rcv_conn_list.next;
1636 list_del(lh);
1637 list_add_tail(lh, &(nb->rcv_conn_list));
1639 src_in = container_of(lh, struct cor_conn, source.in.nb_list);
1640 kref_get(&(src_in->ref));
1642 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
1645 spin_lock_bh(&(src_in->rcv_lock));
1646 BUG_ON(src_in->sourcetype != SOURCE_IN);
1647 BUG_ON(src_in->source.in.nb != nb);
1648 resetted = cor_reset_timeouted_conn_needed(nb, src_in);
1649 spin_unlock_bh(&(src_in->rcv_lock));
1650 if (likely(resetted == 0))
1651 goto put;
1653 resetted = cor_reset_timeouted_conn(nb, src_in);
1655 put:
1656 kref_put(&(src_in->ref), cor_free_conn);
1658 if (likely(resetted == 0))
1659 break;
1664 * may not be called by more than one thread at the same time, because
1665 * 1) readding cor_control_msg_out may reorder them
1666 * 2) multiple pings may be sent
1668 int cor_send_messages(struct cor_neighbor *nb, ktime_t cmsg_send_start,
1669 int *sent)
1671 int rc = QOS_RESUME_DONE;
1672 int ping;
1673 int initsession;
1674 __u32 targetmss = cor_mss_cmsg(nb);
1676 int nbstate = cor_get_neigh_state(nb);
1678 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE))
1679 cor_reset_timeouted_conns(nb);
1681 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
1682 spin_lock_bh(&(nb->retrans_lock));
1683 cor_empty_retrans_queue(nb);
1684 spin_unlock_bh(&(nb->retrans_lock));
1686 cor_delete_all_cmsgs(nb);
1687 return QOS_RESUME_DONE;
1690 ping = cor_time_to_send_ping(nb);
1692 spin_lock_bh(&(nb->cmsg_lock));
1694 if (nb->add_retrans_needed != 0) {
1695 nb->add_retrans_needed = 0;
1696 spin_unlock_bh(&(nb->cmsg_lock));
1697 cor_add_timeouted_retrans(nb);
1698 spin_lock_bh(&(nb->cmsg_lock));
1701 initsession = unlikely(atomic_read(&(nb->sessionid_snd_needed)) != 0);
1703 while (1) {
1704 struct list_head cmsgs;
1705 __u32 length = 0;
1706 __u64 seqno;
1708 INIT_LIST_HEAD(&cmsgs);
1710 if (cor_dequeue_messages(nb, ping, initsession, nbstate,
1711 targetmss, &length, &cmsgs) != 0) {
1712 cor_schedule_controlmsg_timer(nb);
1713 spin_unlock_bh(&(nb->cmsg_lock));
1714 return QOS_RESUME_DONE;
1717 nb->kpacket_seqno++;
1718 seqno = nb->kpacket_seqno;
1720 spin_unlock_bh(&(nb->cmsg_lock));
1722 rc = _cor_send_messages_send(nb, ping, initsession, &cmsgs,
1723 nbstate, length, seqno, cmsg_send_start, sent);
1725 if (rc != QOS_RESUME_DONE)
1726 return rc;
1728 ping = 0;
1729 initsession = 0;
1731 spin_lock_bh(&(nb->cmsg_lock));
1735 static ktime_t cor_calc_cmsg_send_start(unsigned long cmsg_timer_timeout)
1737 ktime_t now = ktime_get();
1738 unsigned long jiffies_tmp = jiffies;
1740 unsigned long jiffies_delayed;
1741 if (unlikely(time_before(cmsg_timer_timeout, jiffies_tmp))) {
1742 jiffies_delayed = 0;
1743 } else {
1744 jiffies_delayed = jiffies_tmp - cmsg_timer_timeout;
1745 if (unlikely(jiffies_delayed > HZ/10)) {
1746 jiffies_delayed = HZ/10;
1750 return ns_to_ktime(ktime_to_ns(now) -
1751 1000LL * jiffies_to_usecs(jiffies_delayed));
1755 void cor_controlmsg_timerfunc(struct timer_list *cmsg_timer)
1757 struct cor_neighbor *nb = container_of(cmsg_timer,
1758 struct cor_neighbor, cmsg_timer);
1759 unsigned long cmsg_timer_timeout = (unsigned long)
1760 atomic64_read(&(nb->cmsg_timer_timeout));
1761 ktime_t cmsg_send_start = cor_calc_cmsg_send_start(cmsg_timer_timeout);
1762 cor_qos_enqueue(nb->queue, &(nb->rb_kp), cmsg_send_start,
1763 QOS_CALLER_KPACKET);
1764 kref_put(&(nb->ref), cor_neighbor_free);
1767 static int cor_cmsg_full_packet(struct cor_neighbor *nb, int nbstate)
1769 __u32 extralength = 0;
1770 int ping = cor_time_to_send_ping(nb);
1771 int initsession = unlikely(atomic_read(&(nb->sessionid_snd_needed)) !=
1773 __u32 len = cor_get_total_messages_length(nb, ping, initsession,
1774 nbstate, &extralength);
1776 if (len == 0)
1777 return 0;
1778 if (len < cor_mss_cmsg(nb))
1779 return 0;
1781 return 1;
1784 void cor_schedule_controlmsg_timer(struct cor_neighbor *nb_cmsglocked)
1786 unsigned long timeout;
1787 int nbstate = cor_get_neigh_state(nb_cmsglocked);
1789 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED))
1790 goto now;
1792 if (atomic_read(&(nb_cmsglocked->cmsg_bulk_readds)) != 0)
1793 return;
1795 if (cor_cmsg_full_packet(nb_cmsglocked, nbstate))
1796 goto now;
1798 if (nb_cmsglocked->add_retrans_needed != 0)
1799 goto now;
1801 timeout = cor_get_cmsg_timer_timeout(nb_cmsglocked, nbstate);
1803 if (0) {
1804 now:
1805 cor_qos_enqueue(nb_cmsglocked->queue, &(nb_cmsglocked->rb_kp),
1806 ktime_get(), QOS_CALLER_KPACKET);
1807 } else if (time_before_eq(timeout, jiffies)) {
1808 ktime_t cmsg_send_start = cor_calc_cmsg_send_start(timeout);
1809 cor_qos_enqueue(nb_cmsglocked->queue, &(nb_cmsglocked->rb_kp),
1810 cmsg_send_start, QOS_CALLER_KPACKET);
1811 } else {
1812 atomic64_set(&(nb_cmsglocked->cmsg_timer_timeout), timeout);
1813 barrier();
1814 if (mod_timer(&(nb_cmsglocked->cmsg_timer), timeout) == 0) {
1815 kref_get(&(nb_cmsglocked->ref));
1820 static int cor_insert_pending_conn_resets(struct cor_control_msg_out *ins)
1822 struct cor_neighbor *nb = ins->nb;
1823 __u32 conn_id = ins->msg.reset_conn.conn_id;
1825 struct rb_root *root;
1826 struct rb_node **p;
1827 struct rb_node *parent = 0;
1829 BUG_ON(nb == 0);
1830 BUG_ON(ins->msg.reset_conn.in_pending_conn_resets != 0);
1832 root = &(nb->pending_conn_resets_rb);
1833 p = &(root->rb_node);
1835 while ((*p) != 0) {
1836 struct cor_control_msg_out *cm = container_of(*p,
1837 struct cor_control_msg_out,
1838 msg.reset_conn.rbn);
1839 __u32 cm_connid = cm->msg.reset_conn.conn_id;
1841 BUG_ON(cm->nb != ins->nb);
1842 BUG_ON(cm->type != MSGTYPE_RESET_CONN);
1844 parent = *p;
1845 if (conn_id == cm_connid) {
1846 return 1;
1847 } else if (conn_id < cm_connid) {
1848 p = &(*p)->rb_left;
1849 } else if (conn_id > cm_connid) {
1850 p = &(*p)->rb_right;
1851 } else {
1852 BUG();
1856 kref_get(&(ins->ref));
1857 rb_link_node(&(ins->msg.reset_conn.rbn), parent, p);
1858 rb_insert_color(&(ins->msg.reset_conn.rbn), root);
1859 ins->msg.reset_conn.in_pending_conn_resets = 1;
1861 return 0;
1864 static void cor_free_oldest_pong(struct cor_neighbor *nb)
1866 struct cor_control_msg_out *cm = container_of(nb->cmsg_queue_pong.next,
1867 struct cor_control_msg_out, lh);
1869 BUG_ON(list_empty(&(nb->cmsg_queue_pong)));
1870 BUG_ON(unlikely(cm->type != MSGTYPE_PONG));
1872 list_del(&(cm->lh));
1873 nb->cmsg_pongslength -= cm->length;
1874 BUG_ON(nb->cmsg_pongscnt == 0);
1875 cm->nb->cmsg_pongscnt--;
1876 cor_free_control_msg(cm);
1879 static int _cor_enqueue_control_msg(struct cor_control_msg_out *cm, int src)
1881 if (unlikely(cm->type == MSGTYPE_PONG)) {
1882 BUG_ON(src == ADDCMSG_SRC_SPLITCONNDATA);
1884 if (cm->nb->cmsg_pongscnt >= MAX_PONG_CMSGS_PER_NEIGH) {
1885 if (src != ADDCMSG_SRC_NEW) {
1886 BUG_ON(cm->nb->cmsg_pongscnt == 0);
1887 cm->nb->cmsg_pongscnt--;
1888 cor_free_control_msg(cm);
1889 return 1;
1890 } else {
1891 cor_free_oldest_pong(cm->nb);
1895 cm->nb->cmsg_pongscnt++;
1896 cm->nb->cmsg_pongslength += cm->length;
1898 if (src != ADDCMSG_SRC_NEW) {
1899 list_add(&(cm->lh), &(cm->nb->cmsg_queue_pong));
1900 } else {
1901 list_add_tail(&(cm->lh), &(cm->nb->cmsg_queue_pong));
1904 return 0;
1905 } else if (unlikely(cm->type == MSGTYPE_RESET_CONN)) {
1906 if (cor_insert_pending_conn_resets(cm) != 0) {
1907 cm->type = 0;
1908 cor_free_control_msg(cm);
1909 return 1;
1913 cm->nb->cmsg_otherlength += cm->length;
1914 if (src == ADDCMSG_SRC_NEW) {
1915 if (cm->type == MSGTYPE_ACK) {
1916 list_add_tail(&(cm->lh), &(cm->nb->cmsg_queue_ack));
1917 } else if (cm->type == MSGTYPE_ACK_CONN) {
1918 list_add_tail(&(cm->lh), &(cm->nb->cmsg_queue_ackconn));
1919 } else if (cm->type == MSGTYPE_CONNDATA &&
1920 cm->msg.conn_data.highlatency != 0) {
1921 list_add_tail(&(cm->lh),
1922 &(cm->nb->cmsg_queue_conndata_highlat));
1923 } else if (cm->type == MSGTYPE_CONNDATA) {
1924 list_add_tail(&(cm->lh),
1925 &(cm->nb->cmsg_queue_conndata_lowlat));
1926 } else {
1927 list_add_tail(&(cm->lh), &(cm->nb->cmsg_queue_other));
1929 } else {
1930 BUG_ON(src == ADDCMSG_SRC_SPLITCONNDATA &&
1931 cm->type != MSGTYPE_CONNDATA);
1932 BUG_ON(src == ADDCMSG_SRC_READD &&
1933 cm->type == MSGTYPE_ACK_CONN);
1935 if (cm->type == MSGTYPE_ACK) {
1936 list_add(&(cm->lh), &(cm->nb->cmsg_queue_ack));
1937 } else if (cm->type == MSGTYPE_ACK_CONN) {
1938 list_add(&(cm->lh), &(cm->nb->cmsg_queue_ackconn));
1939 } else if (cm->type == MSGTYPE_CONNDATA &&
1940 cm->msg.conn_data.highlatency != 0) {
1941 list_add(&(cm->lh),
1942 &(cm->nb->cmsg_queue_conndata_highlat));
1943 } else if (cm->type == MSGTYPE_CONNDATA) {
1944 list_add(&(cm->lh),
1945 &(cm->nb->cmsg_queue_conndata_lowlat));
1946 } else {
1947 list_add(&(cm->lh), &(cm->nb->cmsg_queue_other));
1951 return 0;
1954 static void cor_enqueue_control_msg(struct cor_control_msg_out *cm, int src)
1956 BUG_ON(cm == 0);
1957 BUG_ON(cm->nb == 0);
1959 if (src == ADDCMSG_SRC_NEW)
1960 cm->time_added = jiffies;
1962 spin_lock_bh(&(cm->nb->cmsg_lock));
1964 if (_cor_enqueue_control_msg(cm, src) != 0)
1965 goto out;
1967 if (src != ADDCMSG_SRC_READD && src != ADDCMSG_SRC_RETRANS)
1968 cor_schedule_controlmsg_timer(cm->nb);
1970 out:
1971 spin_unlock_bh(&(cm->nb->cmsg_lock));
1975 void cor_send_pong(struct cor_neighbor *nb, __u32 cookie, ktime_t ping_rcvtime)
1977 struct cor_control_msg_out *cm = _cor_alloc_control_msg(nb);
1979 if (unlikely(cm == 0))
1980 return;
1982 cm->nb = nb;
1983 cm->type = MSGTYPE_PONG;
1984 cm->msg.pong.cookie = cookie;
1985 cm->msg.pong.type = MSGTYPE_PONG_TIMEENQUEUED;
1986 cm->msg.pong.ping_rcvtime = ping_rcvtime;
1987 cm->msg.pong.time_enqueued = ktime_get();
1988 cm->length = 13;
1989 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
1992 void cor_send_ack(struct cor_neighbor *nb, __u64 seqno)
1994 struct cor_control_msg_out *cm = cor_alloc_control_msg(nb,
1995 ACM_PRIORITY_HIGH);
1997 if (unlikely(cm == 0))
1998 return;
2000 cm->nb = nb;
2001 cm->type = MSGTYPE_ACK;
2002 cm->msg.ack.seqno = seqno;
2003 cm->length = 7;
2004 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2007 static void cor_set_ooolen_flags(struct cor_control_msg_out *cm)
2009 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
2010 (~KP_ACK_CONN_FLAGS_OOO));
2011 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
2012 cor_ooolen_to_flags(cm->msg.ack_conn.length));
2015 /* cmsg_lock must be held */
2016 static void cor_remove_pending_ackconn(struct cor_control_msg_out *cm)
2018 cm->nb->cmsg_otherlength -= cm->length;
2019 list_del(&(cm->lh));
2021 list_del(&(cm->msg.ack_conn.conn_acks));
2022 kref_put(&(cm->msg.ack_conn.src_in->ref), cor_free_conn);
2023 cm->msg.ack_conn.src_in = 0;
2025 cm->type = 0;
2026 cor_free_control_msg(cm);
2029 /* cmsg_lock must be held */
2030 static void cor_recalc_scheduled_ackconn_size(struct cor_control_msg_out *cm)
2032 cm->nb->cmsg_otherlength -= cm->length;
2033 cm->length = 6 + cor_ack_conn_len(cm->msg.ack_conn.flags);
2034 cm->nb->cmsg_otherlength += cm->length;
2037 /* cmsg_lock must be held */
2038 static int _cor_try_merge_ackconn(struct cor_conn *src_in_l,
2039 struct cor_control_msg_out *fromcm,
2040 struct cor_control_msg_out *tocm, int from_newack)
2042 if (cor_ooolen(fromcm->msg.ack_conn.flags) != 0 &&
2043 cor_ooolen(tocm->msg.ack_conn.flags) != 0) {
2044 __u64 tocmseqno = tocm->msg.ack_conn.seqno_ooo;
2045 __u64 tocmlength = tocm->msg.ack_conn.length;
2046 __u64 fromcmseqno = fromcm->msg.ack_conn.seqno_ooo;
2047 __u64 fromcmlength = fromcm->msg.ack_conn.length;
2049 if (cor_seqno_eq(tocmseqno, fromcmseqno)) {
2050 if (fromcmlength > tocmlength)
2051 tocm->msg.ack_conn.length = fromcmlength;
2052 } else if (cor_seqno_after(fromcmseqno, tocmseqno) &&
2053 cor_seqno_before_eq(fromcmseqno, tocmseqno +
2054 tocmlength)) {
2055 __u64 len = cor_seqno_clean(fromcmseqno + fromcmlength -
2056 tocmseqno);
2057 BUG_ON(len > U32_MAX);
2058 tocm->msg.ack_conn.length = (__u32) len;
2059 } else if (cor_seqno_before(fromcmseqno, tocmseqno) &&
2060 cor_seqno_after_eq(fromcmseqno, tocmseqno)) {
2061 __u64 len = cor_seqno_clean(tocmseqno + tocmlength -
2062 fromcmseqno);
2063 BUG_ON(len > U32_MAX);
2064 tocm->msg.ack_conn.seqno_ooo = fromcmseqno;
2065 tocm->msg.ack_conn.length = (__u32) len;
2066 } else {
2067 return 1;
2069 cor_set_ooolen_flags(tocm);
2072 if ((fromcm->msg.ack_conn.flags &
2073 KP_ACK_CONN_FLAGS_SEQNO) != 0) {
2074 if ((tocm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) == 0)
2075 goto setseqno;
2077 BUG_ON(cor_seqno_eq(fromcm->msg.ack_conn.ack_seqno,
2078 tocm->msg.ack_conn.ack_seqno));
2079 if (cor_seqno_after_eq(tocm->msg.ack_conn.ack_seqno,
2080 fromcm->msg.ack_conn.ack_seqno)) {
2081 BUG_ON(cor_seqno_after(fromcm->msg.ack_conn.seqno,
2082 tocm->msg.ack_conn.seqno));
2083 goto skipseqno;
2086 BUG_ON(cor_seqno_before(fromcm->msg.ack_conn.seqno,
2087 tocm->msg.ack_conn.seqno));
2089 setseqno:
2090 tocm->msg.ack_conn.flags = (tocm->msg.ack_conn.flags |
2091 KP_ACK_CONN_FLAGS_SEQNO);
2092 tocm->msg.ack_conn.seqno = fromcm->msg.ack_conn.seqno;
2093 tocm->msg.ack_conn.ack_seqno = fromcm->msg.ack_conn.ack_seqno;
2095 skipseqno:
2096 if ((fromcm->msg.ack_conn.flags &
2097 KP_ACK_CONN_FLAGS_WINDOW) != 0)
2098 tocm->msg.ack_conn.flags = (tocm->msg.ack_conn.flags |
2099 KP_ACK_CONN_FLAGS_WINDOW);
2103 if (cor_ooolen(fromcm->msg.ack_conn.flags) != 0) {
2104 tocm->msg.ack_conn.seqno_ooo = fromcm->msg.ack_conn.seqno_ooo;
2105 tocm->msg.ack_conn.length = fromcm->msg.ack_conn.length;
2106 cor_set_ooolen_flags(tocm);
2109 if ((fromcm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) != 0) {
2110 BUG_ON((tocm->msg.ack_conn.flags &
2111 KP_ACK_CONN_FLAGS_PRIORITY) != 0);
2112 tocm->msg.ack_conn.priority_seqno =
2113 fromcm->msg.ack_conn.priority_seqno;
2114 tocm->msg.ack_conn.priority = fromcm->msg.ack_conn.priority;
2117 cor_recalc_scheduled_ackconn_size(tocm);
2118 if (from_newack == 0)
2119 cor_remove_pending_ackconn(fromcm);
2121 return 0;
2124 /* cmsg_lock must be held */
2125 static void cor_try_merge_ackconns(struct cor_conn *src_in_l,
2126 struct cor_control_msg_out *cm)
2128 struct list_head *currlh = cm->msg.ack_conn.conn_acks.next;
2130 while (currlh != &(src_in_l->source.in.acks_pending)) {
2131 struct cor_control_msg_out *currcm = container_of(currlh,
2132 struct cor_control_msg_out,
2133 msg.ack_conn.conn_acks);
2134 currlh = currlh->next;
2135 cor_remove_connack_oooflag_ifold(src_in_l, currcm);
2136 _cor_try_merge_ackconn(src_in_l, currcm, cm, 0);
2140 static void cor_merge_or_enqueue_ackconn(struct cor_conn *src_in_l,
2141 struct cor_control_msg_out *cm, int src)
2143 struct list_head *currlh;
2145 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
2147 spin_lock_bh(&(cm->nb->cmsg_lock));
2149 currlh = src_in_l->source.in.acks_pending.next;
2150 while (currlh != &(src_in_l->source.in.acks_pending)) {
2151 struct cor_control_msg_out *currcm = container_of(currlh,
2152 struct cor_control_msg_out,
2153 msg.ack_conn.conn_acks);
2155 BUG_ON(currcm->nb != cm->nb);
2156 BUG_ON(currcm->type != MSGTYPE_ACK_CONN);
2157 BUG_ON(cm->msg.ack_conn.src_in != src_in_l);
2158 BUG_ON(currcm->msg.ack_conn.conn_id !=
2159 cm->msg.ack_conn.conn_id);
2161 if (_cor_try_merge_ackconn(src_in_l, cm, currcm, 1) == 0) {
2162 cor_try_merge_ackconns(src_in_l, currcm);
2163 cor_schedule_controlmsg_timer(currcm->nb);
2164 spin_unlock_bh(&(currcm->nb->cmsg_lock));
2166 * flags:
2167 * when calling cor_free_control_msg here conn may
2168 * already be locked and priority_send_allowed and
2169 * priority_send_allowed should not be reset
2171 cm->msg.ack_conn.flags = 0;
2172 cor_free_control_msg(cm);
2173 return;
2176 currlh = currlh->next;
2179 list_add_tail(&(cm->msg.ack_conn.conn_acks),
2180 &(src_in_l->source.in.acks_pending));
2182 spin_unlock_bh(&(cm->nb->cmsg_lock));
2184 cor_enqueue_control_msg(cm, src);
2187 static int cor_try_update_ackconn_seqno(struct cor_conn *src_in_l)
2189 int rc = 1;
2191 spin_lock_bh(&(src_in_l->source.in.nb->cmsg_lock));
2193 if (list_empty(&(src_in_l->source.in.acks_pending)) == 0) {
2194 struct cor_control_msg_out *cm = container_of(
2195 src_in_l->source.in.acks_pending.next,
2196 struct cor_control_msg_out,
2197 msg.ack_conn.conn_acks);
2198 BUG_ON(cm->nb != src_in_l->source.in.nb);
2199 BUG_ON(cm->type != MSGTYPE_ACK_CONN);
2200 BUG_ON(cm->msg.ack_conn.src_in != src_in_l);
2201 BUG_ON(cm->msg.ack_conn.conn_id !=
2202 src_in_l->reversedir->target.out.conn_id);
2204 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
2205 KP_ACK_CONN_FLAGS_SEQNO |
2206 KP_ACK_CONN_FLAGS_WINDOW);
2207 cm->msg.ack_conn.seqno = src_in_l->source.in.next_seqno;
2209 src_in_l->source.in.ack_seqno++;
2210 cm->msg.ack_conn.ack_seqno = src_in_l->source.in.ack_seqno;
2212 cor_remove_connack_oooflag_ifold(src_in_l, cm);
2213 cor_recalc_scheduled_ackconn_size(cm);
2215 cor_try_merge_ackconns(src_in_l, cm);
2217 rc = 0;
2220 spin_unlock_bh(&(src_in_l->source.in.nb->cmsg_lock));
2222 return rc;
2225 void cor_send_ack_conn_ifneeded(struct cor_conn *src_in_l, __u64 seqno_ooo,
2226 __u32 ooo_length)
2228 struct cor_control_msg_out *cm;
2230 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
2232 BUG_ON(ooo_length > 0 && cor_seqno_before_eq(seqno_ooo,
2233 src_in_l->source.in.next_seqno));
2235 cor_update_windowlimit(src_in_l);
2237 if (ooo_length != 0) {
2238 cm = cor_alloc_control_msg(src_in_l->source.in.nb,
2239 ACM_PRIORITY_LOW);
2240 if (cm != 0)
2241 goto add;
2244 if (src_in_l->source.in.inorder_ack_needed != 0)
2245 goto ack_needed;
2247 if (cor_seqno_clean(src_in_l->source.in.window_seqnolimit -
2248 src_in_l->source.in.next_seqno) < WINDOW_ENCODE_MIN)
2249 return;
2251 if (cor_seqno_clean(src_in_l->source.in.window_seqnolimit_remote -
2252 src_in_l->source.in.next_seqno) >= WINDOW_ENCODE_MIN &&
2253 cor_seqno_clean(src_in_l->source.in.window_seqnolimit -
2254 src_in_l->source.in.next_seqno) * 7 <
2255 cor_seqno_clean(
2256 src_in_l->source.in.window_seqnolimit_remote -
2257 src_in_l->source.in.next_seqno) * 8)
2258 return;
2260 ack_needed:
2261 if (cor_try_update_ackconn_seqno(src_in_l) == 0)
2262 goto out;
2264 cm = cor_alloc_control_msg(src_in_l->source.in.nb, ACM_PRIORITY_MED);
2265 if (cm == 0) {
2266 printk(KERN_ERR "error allocating inorder ack");
2267 return;
2270 add:
2271 cm->type = MSGTYPE_ACK_CONN;
2272 src_in_l->source.in.ack_seqno++;
2273 cm->msg.ack_conn.ack_seqno = src_in_l->source.in.ack_seqno;
2274 kref_get(&(src_in_l->ref));
2275 cm->msg.ack_conn.src_in = src_in_l;
2276 cm->msg.ack_conn.conn_id = src_in_l->reversedir->target.out.conn_id;
2277 cm->msg.ack_conn.seqno = src_in_l->source.in.next_seqno;
2278 cm->msg.ack_conn.seqno_ooo = seqno_ooo;
2279 cm->msg.ack_conn.length = ooo_length;
2280 cm->msg.ack_conn.flags = KP_ACK_CONN_FLAGS_SEQNO |
2281 KP_ACK_CONN_FLAGS_WINDOW;
2282 cor_set_ooolen_flags(cm);
2283 cm->length = 6 + cor_ack_conn_len(cm->msg.ack_conn.flags);
2285 cor_merge_or_enqueue_ackconn(src_in_l, cm, ADDCMSG_SRC_NEW);
2287 out:
2288 src_in_l->source.in.inorder_ack_needed = 0;
2289 src_in_l->source.in.window_seqnolimit_remote =
2290 src_in_l->source.in.window_seqnolimit;
2293 static int cor_try_add_priority(struct cor_conn *trgt_out_l, __u8 priority)
2295 int rc = 1;
2296 struct cor_conn *src_in = trgt_out_l->reversedir;
2298 spin_lock_bh(&(trgt_out_l->target.out.nb->cmsg_lock));
2300 if (list_empty(&(src_in->source.in.acks_pending)) == 0) {
2301 struct cor_control_msg_out *cm = container_of(
2302 src_in->source.in.acks_pending.next,
2303 struct cor_control_msg_out,
2304 msg.ack_conn.conn_acks);
2305 BUG_ON(cm->nb != trgt_out_l->target.out.nb);
2306 BUG_ON(cm->type != MSGTYPE_ACK_CONN);
2307 BUG_ON(cm->msg.ack_conn.src_in != trgt_out_l->reversedir);
2308 BUG_ON(cm->msg.ack_conn.conn_id !=
2309 trgt_out_l->target.out.conn_id);
2311 BUG_ON((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) !=
2313 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
2314 KP_ACK_CONN_FLAGS_PRIORITY);
2315 cm->msg.ack_conn.priority_seqno =
2316 trgt_out_l->target.out.priority_seqno;
2317 cm->msg.ack_conn.priority = priority;
2318 cor_recalc_scheduled_ackconn_size(cm);
2320 rc = 0;
2323 spin_unlock_bh(&(trgt_out_l->target.out.nb->cmsg_lock));
2325 return rc;
2328 void cor_send_priority(struct cor_conn *trgt_out_ll, int force, __u8 priority)
2330 struct cor_control_msg_out *cm;
2332 if (cor_try_add_priority(trgt_out_ll, priority) == 0)
2333 goto out;
2335 if (force == 0)
2336 return;
2338 cm = cor_alloc_control_msg(trgt_out_ll->target.out.nb,
2339 ACM_PRIORITY_LOW);
2341 if (cm == 0)
2342 return;
2344 cm->type = MSGTYPE_ACK_CONN;
2345 cm->msg.ack_conn.flags = KP_ACK_CONN_FLAGS_PRIORITY;
2346 kref_get(&(trgt_out_ll->reversedir->ref));
2347 BUG_ON(trgt_out_ll->targettype != TARGET_OUT);
2348 cm->msg.ack_conn.src_in = trgt_out_ll->reversedir;
2349 cm->msg.ack_conn.conn_id = trgt_out_ll->target.out.conn_id;
2350 cm->msg.ack_conn.priority_seqno =
2351 trgt_out_ll->target.out.priority_seqno;
2352 cm->msg.ack_conn.priority = priority;
2354 cm->length = 6 + cor_ack_conn_len(cm->msg.ack_conn.flags);
2355 cor_merge_or_enqueue_ackconn(trgt_out_ll->reversedir, cm,
2356 ADDCMSG_SRC_NEW);
2358 out:
2359 trgt_out_ll->target.out.priority_last = priority;
2360 trgt_out_ll->target.out.priority_seqno++;
2361 trgt_out_ll->target.out.priority_send_allowed = 0;
2364 void cor_free_ack_conns(struct cor_conn *src_in_lx)
2366 int changed = 0;
2367 spin_lock_bh(&(src_in_lx->source.in.nb->cmsg_lock));
2368 while (list_empty(&(src_in_lx->source.in.acks_pending)) == 0) {
2369 struct list_head *currlh =
2370 src_in_lx->source.in.acks_pending.next;
2371 struct cor_control_msg_out *currcm = container_of(currlh,
2372 struct cor_control_msg_out,
2373 msg.ack_conn.conn_acks);
2375 cor_remove_pending_ackconn(currcm);
2376 changed = 1;
2378 if (changed)
2379 cor_schedule_controlmsg_timer(src_in_lx->source.in.nb);
2380 spin_unlock_bh(&(src_in_lx->source.in.nb->cmsg_lock));
2383 void cor_send_connect_success(struct cor_control_msg_out *cm, __u32 conn_id,
2384 struct cor_conn *src_in)
2386 cm->type = MSGTYPE_CONNECT_SUCCESS;
2387 cm->msg.connect_success.conn_id = conn_id;
2388 kref_get(&(src_in->ref));
2389 cm->msg.connect_success.src_in = src_in;
2390 cm->length = 6;
2391 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2394 void cor_send_connect_nb(struct cor_control_msg_out *cm, __u32 conn_id,
2395 __u64 seqno1, __u64 seqno2, struct cor_conn *src_in_ll)
2397 cm->type = MSGTYPE_CONNECT;
2398 cm->msg.connect.conn_id = conn_id;
2399 cm->msg.connect.seqno1 = seqno1;
2400 cm->msg.connect.seqno2 = seqno2;
2401 kref_get(&(src_in_ll->ref));
2402 BUG_ON(src_in_ll->sourcetype != SOURCE_IN);
2403 cm->msg.connect.src_in = src_in_ll;
2404 cm->length = 21;
2405 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2408 void cor_send_conndata(struct cor_control_msg_out *cm, __u32 conn_id,
2409 __u64 seqno, char *data_orig, char *data, __u32 datalen,
2410 __u8 snd_delayed_lowbuf, __u8 flush, __u8 highlatency,
2411 struct cor_conn_retrans *cr)
2413 cm->type = MSGTYPE_CONNDATA;
2414 cm->msg.conn_data.conn_id = conn_id;
2415 cm->msg.conn_data.seqno = seqno;
2416 cm->msg.conn_data.data_orig = data_orig;
2417 cm->msg.conn_data.data = data;
2418 cm->msg.conn_data.datalen = datalen;
2419 cm->msg.conn_data.snd_delayed_lowbuf = snd_delayed_lowbuf;
2420 cm->msg.conn_data.flush = flush;
2421 cm->msg.conn_data.highlatency = highlatency;
2422 cm->msg.conn_data.cr = cr;
2423 cm->length = KP_CONN_DATA_CMDLEN + datalen;
2424 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2427 int cor_send_reset_conn(struct cor_neighbor *nb, __u32 conn_id, int lowprio)
2429 struct cor_control_msg_out *cm;
2431 if (unlikely(cor_get_neigh_state(nb) == NEIGHBOR_STATE_KILLED))
2432 return 0;
2434 cm = cor_alloc_control_msg(nb, lowprio ?
2435 ACM_PRIORITY_LOW : ACM_PRIORITY_MED);
2437 if (unlikely(cm == 0))
2438 return 1;
2440 cm->type = MSGTYPE_RESET_CONN;
2441 cm->msg.reset_conn.conn_id = conn_id;
2442 cm->length = 5;
2444 cor_enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2446 return 0;
2449 int __init cor_kgen_init(void)
2451 cor_controlmsg_slab = kmem_cache_create("cor_controlmsg",
2452 sizeof(struct cor_control_msg_out), 8, 0, 0);
2453 if (unlikely(cor_controlmsg_slab == 0))
2454 return -ENOMEM;
2456 cor_controlretrans_slab = kmem_cache_create("cor_controlretransmsg",
2457 sizeof(struct cor_control_retrans), 8, 0, 0);
2458 if (unlikely(cor_controlretrans_slab == 0))
2459 return -ENOMEM;
2461 return 0;
2464 void __exit cor_kgen_exit2(void)
2466 kmem_cache_destroy(cor_controlretrans_slab);
2467 cor_controlretrans_slab = 0;
2469 kmem_cache_destroy(cor_controlmsg_slab);
2470 cor_controlmsg_slab = 0;
2473 MODULE_LICENSE("GPL");