use value returned by atomic64_cmpxchg on retries, account unsent data for windowlimi...
[cor.git] / net / cor / kpacket_gen.c
blob799e9ed5c3e55ae99178079c549bfa955260d1db
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2019 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <asm/byteorder.h>
23 #include "cor.h"
25 /* not sent over the network - internal meaning only */
26 #define MSGTYPE_PONG 1
27 #define MSGTYPE_ACK 2
28 #define MSGTYPE_ACK_CONN 3
29 #define MSGTYPE_CONNECT 4
30 #define MSGTYPE_CONNECT_SUCCESS 5
31 #define MSGTYPE_RESET_CONN 6
32 #define MSGTYPE_CONNDATA 7
33 #define MSGTYPE_SET_MAX_CMSG_DELAY 8
35 #define MSGTYPE_PONG_TIMEENQUEUED 1
36 #define MSGTYPE_PONG_RESPDELAY 2
38 struct control_msg_out{
39 __u8 type;
40 __u32 length;
41 struct kref ref;
42 struct neighbor *nb;
44 union {
45 /* either queue or control_retrans_packet */
46 struct list_head list;
48 /* conn_data */
49 struct heap_element hpel;
50 }lh;
52 union{
53 unsigned long timeout; /* MSGTYPE_CONNDATA */
54 unsigned long time_added; /* everything else */
55 }timing;
57 union{
58 struct{
59 __u32 cookie;
60 __u8 type;
62 ktime_t time_enqueued;
63 }pong;
65 struct{
66 __u64 seqno;
67 }ack;
69 struct{
70 struct conn *src_in;
71 struct list_head conn_acks;
72 __u32 conn_id;
73 __u64 seqno;
74 __u64 seqno_ooo;
75 __u32 length;
77 __u8 priority_seqno;
78 __u8 priority;
80 __u8 flags;
82 __u32 ack_seqno;
83 }ack_conn;
85 struct{
86 __u32 conn_id;
87 __u64 seqno1;
88 __u64 seqno2;
89 struct conn *src_in;
90 }connect;
92 struct{
93 __u32 conn_id;
94 struct conn *src_in;
95 }connect_success;
97 struct{
98 struct rb_node rbn;
99 __u8 in_pending_conn_resets;
100 __u32 conn_id;
101 }reset_conn;
103 struct{
104 __u32 conn_id;
105 __u64 seqno;
106 __u32 datalen;
107 __u32 max_delay_hz;
108 __u8 snd_delayed_lowbuf;
109 char *data_orig;
110 char *data;
111 struct conn_retrans *cr;
112 }conn_data;
114 struct{
115 __u32 ack_delay;
116 __u32 ackconn_delay;
117 __u32 other_delay;
118 }set_max_cmsg_delay;
119 }msg;
122 struct control_retrans {
123 struct kref ref;
125 struct neighbor *nb;
126 __u64 seqno;
128 unsigned long timeout;
130 struct list_head msgs;
132 struct rb_node rbn;
133 struct list_head timeout_list;
136 struct unknownconnid_matchparam {
137 struct neighbor *nb;
138 __u32 conn_id;
142 static struct kmem_cache *controlmsg_slab;
143 static struct kmem_cache *controlretrans_slab;
145 static atomic_t cmsg_othercnt = ATOMIC_INIT(0);
147 #define ADDCMSG_SRC_NEW 1
148 #define ADDCMSG_SRC_SPLITCONNDATA 2
149 #define ADDCMSG_SRC_READD 3
150 #define ADDCMSG_SRC_RETRANS 4
152 static void enqueue_control_msg(struct control_msg_out *msg, int src);
154 static void try_merge_ackconns(struct conn *src_in_l,
155 struct control_msg_out *cm);
157 static void merge_or_enqueue_ackconn(struct conn *src_in_l,
158 struct control_msg_out *cm, int src);
160 static int conndata_comparetimeout(struct heap_element *el1,
161 struct heap_element *el2)
163 struct control_msg_out *cm1 = container_of(el1, struct control_msg_out,
164 lh.hpel);
165 struct control_msg_out *cm2 = container_of(el2, struct control_msg_out,
166 lh.hpel);
168 BUG_ON(cm1->type != MSGTYPE_CONNDATA);
169 BUG_ON(cm2->type != MSGTYPE_CONNDATA);
171 if (cm1->timing.timeout < cm2->timing.timeout)
172 return -1;
173 else if (cm1->timing.timeout == cm2->timing.timeout)
174 return 0;
175 else
176 return 1;
179 static struct heap_definition conndata_heapdef = {
180 .compare_elements = conndata_comparetimeout
183 static struct control_msg_out *_alloc_control_msg(struct neighbor *nb)
185 struct control_msg_out *cm;
187 BUG_ON(nb == 0);
189 cm = kmem_cache_alloc(controlmsg_slab, GFP_ATOMIC);
190 if (unlikely(cm == 0))
191 return 0;
192 memset(cm, 0, sizeof(struct control_msg_out));
193 kref_init(&(cm->ref));
194 cm->nb = nb;
195 return cm;
198 static int calc_limit(int limit, int priority)
200 if (priority == ACM_PRIORITY_LOW)
201 return (limit+1)/2;
202 else if (priority == ACM_PRIORITY_MED)
203 return (limit * 3 + 1)/4;
204 else if (priority == ACM_PRIORITY_HIGH)
205 return limit;
206 else
207 BUG();
210 struct control_msg_out *alloc_control_msg(struct neighbor *nb, int priority)
212 struct control_msg_out *cm = 0;
214 long packets1;
215 long packets2;
217 BUG_ON(nb == 0);
219 packets1 = atomic_inc_return(&(nb->cmsg_othercnt));
220 packets2 = atomic_inc_return(&(cmsg_othercnt));
222 BUG_ON(packets1 <= 0);
223 BUG_ON(packets2 <= 0);
225 if (packets1 <= calc_limit(GUARANTEED_CMSGS_PER_NEIGH, priority))
226 goto alloc;
228 if (unlikely(unlikely(packets1 > calc_limit(MAX_CMSGS_PER_NEIGH,
229 priority)) ||
230 unlikely(packets2 > calc_limit(MAX_CMSGS, priority))))
231 goto full;
233 alloc:
234 cm = _alloc_control_msg(nb);
235 if (unlikely(cm == 0)) {
236 full:
238 /* printk(KERN_ERR "alloc_control_msg failed %ld %ld", packets1, packets2); */
239 atomic_dec(&(nb->cmsg_othercnt));
240 atomic_dec(&(cmsg_othercnt));
242 return cm;
245 static void cmsg_kref_free(struct kref *ref)
247 struct control_msg_out *cm = container_of(ref, struct control_msg_out,
248 ref);
249 kmem_cache_free(controlmsg_slab, cm);
252 void free_control_msg(struct control_msg_out *cm)
254 if (likely(cm->type != MSGTYPE_PONG)) {
255 atomic_dec(&(cm->nb->cmsg_othercnt));
256 atomic_dec(&(cmsg_othercnt));
259 if (cm->type == MSGTYPE_ACK_CONN) {
260 struct conn *trgt_out = cm->msg.ack_conn.src_in->reversedir;
261 BUG_ON(cm->msg.ack_conn.src_in == 0);
262 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) != 0){
263 spin_lock_bh(&(trgt_out->rcv_lock));
264 BUG_ON(trgt_out->targettype != TARGET_OUT);
265 if (trgt_out->target.out.priority_send_allowed != 0) {
266 trgt_out->target.out.priority_send_allowed = 1;
267 spin_unlock_bh(&(trgt_out->rcv_lock));
268 refresh_conn_priority(trgt_out, 0);
269 } else {
270 spin_unlock_bh(&(trgt_out->rcv_lock));
273 kref_put(&(cm->msg.ack_conn.src_in->ref), free_conn);
274 cm->msg.ack_conn.src_in = 0;
275 } else if (cm->type == MSGTYPE_CONNECT) {
276 BUG_ON(cm->msg.connect.src_in == 0);
277 kref_put(&(cm->msg.connect.src_in->ref), free_conn);
278 cm->msg.connect.src_in = 0;
279 } else if (cm->type == MSGTYPE_CONNECT_SUCCESS) {
280 BUG_ON(cm->msg.connect_success.src_in == 0);
281 kref_put(&(cm->msg.connect_success.src_in->ref), free_conn);
282 cm->msg.connect_success.src_in = 0;
283 } else if (cm->type == MSGTYPE_RESET_CONN) {
284 spin_lock_bh(&(cm->nb->cmsg_lock));
285 if (cm->msg.reset_conn.in_pending_conn_resets != 0) {
286 rb_erase(&(cm->msg.reset_conn.rbn),
287 &(cm->nb->pending_conn_resets_rb));
288 cm->msg.reset_conn.in_pending_conn_resets = 0;
290 kref_put(&(cm->ref), kreffree_bug);
292 spin_unlock_bh(&(cm->nb->cmsg_lock));
295 kref_put(&(cm->ref), cmsg_kref_free);
298 static void free_control_retrans(struct kref *ref)
300 struct control_retrans *cr = container_of(ref, struct control_retrans,
301 ref);
303 while (list_empty(&(cr->msgs)) == 0) {
304 struct control_msg_out *cm = container_of(cr->msgs.next,
305 struct control_msg_out, lh.list);
307 #warning todo who increments this?
308 if (cm->type == MSGTYPE_PONG)
309 atomic_dec(&(cm->nb->cmsg_pongs_retrans_cnt));
311 list_del(&(cm->lh.list));
312 free_control_msg(cm);
315 kmem_cache_free(controlretrans_slab, cr);
318 struct control_retrans *get_control_retrans(struct neighbor *nb, __u64 seqno)
320 struct rb_node *n = 0;
321 struct control_retrans *ret = 0;
323 spin_lock_bh(&(nb->kp_retransmits_lock));
325 n = nb->kp_retransmits_rb.rb_node;
327 while (likely(n != 0) && ret == 0) {
328 struct control_retrans *cr = container_of(n,
329 struct control_retrans, rbn);
331 BUG_ON(cr->nb != nb);
333 if (seqno_before(seqno, cr->seqno))
334 n = n->rb_left;
335 else if (seqno_after(seqno, cr->seqno))
336 n = n->rb_right;
337 else
338 ret = cr;
341 if (ret != 0)
342 kref_get(&(ret->ref));
344 spin_unlock_bh(&(nb->kp_retransmits_lock));
346 return ret;
349 void insert_control_retrans(struct control_retrans *ins)
351 struct neighbor *nb = ins->nb;
352 __u64 seqno = ins->seqno;
354 struct rb_root *root;
355 struct rb_node **p;
356 struct rb_node *parent = 0;
358 BUG_ON(nb == 0);
360 spin_lock_bh(&(nb->kp_retransmits_lock));
362 root = &(nb->kp_retransmits_rb);
363 p = &(root->rb_node);
365 while ((*p) != 0) {
366 struct control_retrans *cr = container_of(*p,
367 struct control_retrans, rbn);
369 BUG_ON(cr->nb != nb);
371 parent = *p;
372 if (unlikely(seqno_eq(seqno, cr->seqno))) {
373 goto out;
374 } else if (seqno_before(seqno, cr->seqno)) {
375 p = &(*p)->rb_left;
376 } else if (seqno_after(seqno, cr->seqno)) {
377 p = &(*p)->rb_right;
378 } else {
379 BUG();
383 kref_get(&(ins->ref));
384 rb_link_node(&(ins->rbn), parent, p);
385 rb_insert_color(&(ins->rbn), root);
387 out:
388 spin_unlock_bh(&(nb->kp_retransmits_lock));
391 static void remove_connack_oooflag_ifold(struct conn *src_in_l,
392 struct control_msg_out *cm)
394 if (ooolen(cm->msg.ack_conn.flags) != 0 && seqno_before_eq(
395 cm->msg.ack_conn.seqno_ooo +
396 cm->msg.ack_conn.length,
397 src_in_l->source.in.next_seqno)) {
398 cm->msg.ack_conn.length = 0;
399 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
400 (~KP_ACK_CONN_FLAGS_OOO));
404 static int ackconn_prepare_requeue(struct conn *cn_l,
405 struct control_msg_out *cm)
407 if (unlikely(unlikely(cn_l->sourcetype != SOURCE_IN) ||
408 unlikely(cn_l->source.in.nb != cm->nb) ||
409 unlikely(cn_l->reversedir->target.out.conn_id !=
410 cm->msg.ack_conn.conn_id) ||
411 unlikely(cn_l->isreset != 0)))
412 return 0;
414 remove_connack_oooflag_ifold(cn_l, cm);
416 if (!seqno_eq(cm->msg.ack_conn.ack_seqno, cn_l->source.in.ack_seqno))
417 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
418 (~KP_ACK_CONN_FLAGS_SEQNO) &
419 (~KP_ACK_CONN_FLAGS_WINDOW));
421 if (cm->msg.ack_conn.flags == 0)
422 return 0;
424 cm->length = 6 + ack_conn_len(cm->msg.ack_conn.flags);
426 return 1;
429 static void requeue_control_retrans(struct control_retrans *cr)
431 atomic_inc(&(cr->nb->cmsg_bulk_readds));
433 while (list_empty(&(cr->msgs)) == 0) {
434 struct control_msg_out *cm = container_of(cr->msgs.prev,
435 struct control_msg_out, lh.list);
436 list_del(&(cm->lh.list));
438 BUG_ON(cm->nb != cr->nb);
440 if (cm->type == MSGTYPE_ACK_CONN) {
441 struct conn *cn_l = cm->msg.ack_conn.src_in;
442 spin_lock_bh(&(cn_l->rcv_lock));
443 if (unlikely(ackconn_prepare_requeue(cn_l, cm) == 0)) {
444 free_control_msg(cm);
445 } else {
446 merge_or_enqueue_ackconn(cn_l, cm,
447 ADDCMSG_SRC_RETRANS);
450 spin_unlock_bh(&(cn_l->rcv_lock));
451 } else {
452 if(cm->type == MSGTYPE_PONG)
453 atomic_dec(&(cm->nb->cmsg_pongs_retrans_cnt));
454 enqueue_control_msg(cm, ADDCMSG_SRC_RETRANS);
458 atomic_dec(&(cr->nb->cmsg_bulk_readds));
460 spin_lock_bh(&(cr->nb->cmsg_lock));
461 schedule_controlmsg_timer(cr->nb);
462 spin_unlock_bh(&(cr->nb->cmsg_lock));
465 void retransmit_taskfunc(unsigned long arg)
467 struct neighbor *nb = (struct neighbor *) arg;
469 int nbput = 0;
471 int nbstate = get_neigh_state(nb);
473 while (1) {
474 struct control_retrans *cr = 0;
476 spin_lock_bh(&(nb->send_cmsg_lock));
477 spin_lock_bh(&(nb->retrans_lock));
479 if (list_empty(&(nb->retrans_list))) {
480 nb->retrans_timer_running = 0;
481 nbput = 1;
482 spin_unlock_bh(&(nb->retrans_lock));
483 spin_unlock_bh(&(nb->send_cmsg_lock));
484 break;
487 cr = container_of(nb->retrans_list.next,
488 struct control_retrans, timeout_list);
490 BUG_ON(cr->nb != nb);
492 list_del(&(cr->timeout_list));
494 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
495 spin_lock_bh(&(nb->kp_retransmits_lock));
496 rb_erase(&(cr->rbn), &(nb->kp_retransmits_rb));
497 spin_unlock_bh(&(nb->kp_retransmits_lock));
499 spin_unlock_bh(&(nb->retrans_lock));
501 kref_put(&(cr->ref), kreffree_bug); /* rb */
503 kref_put(&(cr->ref), free_control_retrans); /* list */
504 continue;
507 if (time_after(cr->timeout, jiffies)) {
508 list_add(&(cr->timeout_list), &(nb->retrans_list));
509 mod_timer(&(nb->retrans_timer), cr->timeout);
510 spin_unlock_bh(&(nb->retrans_lock));
511 spin_unlock_bh(&(nb->send_cmsg_lock));
512 break;
515 spin_lock_bh(&(nb->kp_retransmits_lock));
516 rb_erase(&(cr->rbn), &(nb->kp_retransmits_rb));
517 spin_unlock_bh(&(nb->kp_retransmits_lock));
519 kref_put(&(cr->ref), kreffree_bug); /* rb */
521 spin_unlock_bh(&(nb->retrans_lock));
522 spin_unlock_bh(&(nb->send_cmsg_lock));
524 requeue_control_retrans(cr);
526 kref_put(&(cr->ref), free_control_retrans);
529 if (nbput)
530 kref_put(&(nb->ref), neighbor_free);
533 void retransmit_timerfunc(struct timer_list *retrans_timer)
535 struct neighbor *nb = container_of(retrans_timer,
536 struct neighbor, retrans_timer);
537 tasklet_schedule(&(nb->retrans_task));
540 static void schedule_retransmit(struct control_retrans *cr, struct neighbor *nb)
542 int first;
544 cr->timeout = calc_timeout(atomic_read(&(nb->latency_retrans_us)),
545 atomic_read(&(nb->latency_stddev_retrans_us)),
546 atomic_read(&(nb->max_remote_ack_delay_us)));
548 spin_lock_bh(&(nb->retrans_lock));
549 insert_control_retrans(cr);
550 first = list_empty(&(nb->retrans_list));
551 list_add_tail(&(cr->timeout_list), &(nb->retrans_list));
553 if (first && nb->retrans_timer_running == 0) {
554 mod_timer(&(nb->retrans_timer), cr->timeout);
555 nb->retrans_timer_running = 1;
556 kref_get(&(nb->ref));
559 spin_unlock_bh(&(nb->retrans_lock));
562 void kern_ack_rcvd(struct neighbor *nb, __u64 seqno)
564 struct control_retrans *cr = 0;
566 spin_lock_bh(&(nb->retrans_lock));
568 cr = get_control_retrans(nb, seqno);
570 if (cr == 0) {
571 /* char *seqno_p = (char *) &seqno;
572 seqno = cpu_to_be32(seqno);
573 printk(KERN_ERR "bogus/duplicate ack received %d %d %d %d",
574 seqno_p[0], seqno_p[1], seqno_p[2], seqno_p[3]);
576 goto out;
579 spin_lock_bh(&(nb->kp_retransmits_lock));
580 rb_erase(&(cr->rbn), &(nb->kp_retransmits_rb));
581 spin_unlock_bh(&(nb->kp_retransmits_lock));
583 BUG_ON(cr->nb != nb);
585 list_del(&(cr->timeout_list));
587 out:
588 spin_unlock_bh(&(nb->retrans_lock));
590 if (cr != 0) {
591 kref_put(&(cr->ref), kreffree_bug); /* get_control_retrans */
592 kref_put(&(cr->ref), kreffree_bug); /* rb_erase */
593 kref_put(&(cr->ref), free_control_retrans); /* list */
597 static __u8 get_window(struct conn *cn, struct neighbor *expectedsender,
598 __u32 expected_connid)
600 __u8 window = 0;
602 spin_lock_bh(&(cn->rcv_lock));
604 if (unlikely(unlikely(cn->sourcetype != SOURCE_IN) ||
605 unlikely(expectedsender != 0 && (cn->source.in.nb !=
606 expectedsender || cn->reversedir->target.out.conn_id !=
607 expected_connid))))
608 goto out;
610 window = enc_log_64_7(seqno_clean(cn->source.in.window_seqnolimit -
611 cn->source.in.next_seqno));
613 cn->source.in.window_seqnolimit_remote = cn->source.in.next_seqno +
614 dec_log_64_7(window);
616 out:
617 spin_unlock_bh(&(cn->rcv_lock));
619 return window;
622 static void padding(struct sk_buff *skb, __u32 length)
624 char *dst;
625 if (length <= 0)
626 return;
627 dst = skb_put(skb, length);
628 BUG_ON(dst == 0);
629 memset(dst, KP_PADDING, length);
633 static int add_init_session(struct sk_buff *skb, __be32 sessionid,
634 __u32 spaceleft)
636 char *dst;
638 if (unlikely(spaceleft < 5))
639 return 0;
641 dst = skb_put(skb, 5);
642 BUG_ON(dst == 0);
644 dst[0] = KP_INIT_SESSION;
645 put_be32(dst + 1, sessionid);
647 return 5;
650 static int add_ack(struct sk_buff *skb, struct control_retrans *cr,
651 struct control_msg_out *cm, __u32 spaceleft)
653 char *dst;
655 if (unlikely(spaceleft < 7))
656 return 0;
658 dst = skb_put(skb, 7);
659 BUG_ON(dst == 0);
661 dst[0] = KP_ACK;
662 put_u48(dst + 1, cm->msg.ack.seqno);
664 free_control_msg(cm);
666 return 7;
669 static int add_ack_conn(struct sk_buff *skb, struct control_retrans *cr,
670 struct control_msg_out *cm, __u32 spaceleft)
672 char *dst;
673 int offset = 0;
675 if (unlikely(spaceleft < cm->length))
676 return 0;
678 dst = skb_put(skb, cm->length);
679 BUG_ON(dst == 0);
681 dst[offset] = KP_ACK_CONN;
682 offset++;
683 put_u32(dst + offset, cm->msg.ack_conn.conn_id);
684 offset += 4;
685 dst[offset] = cm->msg.ack_conn.flags;
686 offset++;
688 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) != 0) {
689 put_u48(dst + offset, cm->msg.ack_conn.seqno);
690 offset += 6;
692 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_WINDOW) != 0) {
693 BUG_ON(cm->msg.ack_conn.src_in == 0);
694 dst[offset] = get_window(cm->msg.ack_conn.src_in,
695 cm->nb, cm->msg.ack_conn.conn_id);
696 offset++;
700 if (ooolen(cm->msg.ack_conn.flags) != 0) {
701 put_u48(dst + offset, cm->msg.ack_conn.seqno_ooo);
702 offset += 6;
703 if (ooolen(cm->msg.ack_conn.flags) == 1) {
704 BUG_ON(cm->msg.ack_conn.length > 255);
705 dst[offset] = cm->msg.ack_conn.length;
706 offset += 1;
707 } else if (ooolen(cm->msg.ack_conn.flags) == 2) {
708 BUG_ON(cm->msg.ack_conn.length <= 255);
709 BUG_ON(cm->msg.ack_conn.length > 65535);
710 put_u16(dst + offset, cm->msg.ack_conn.length);
711 offset += 2;
712 } else if (ooolen(cm->msg.ack_conn.flags) == 4) {
713 BUG_ON(cm->msg.ack_conn.length <= 65535);
714 put_u32(dst + offset, cm->msg.ack_conn.length);
715 offset += 4;
716 } else {
717 BUG();
721 if ((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) != 0) {
722 dst[offset] = cm->msg.ack_conn.priority_seqno;
723 offset++;
724 dst[offset] = cm->msg.ack_conn.priority;
725 offset++;
728 list_add_tail(&(cm->lh.list), &(cr->msgs));
730 BUG_ON(offset != cm->length);
731 return offset;
734 static int add_ping(struct sk_buff *skb, __u32 cookie, __u32 spaceleft)
736 char *dst;
738 if (unlikely(spaceleft < 5))
739 return 0;
741 dst = skb_put(skb, 5);
742 BUG_ON(dst == 0);
744 dst[0] = KP_PING;
745 put_u32(dst + 1, cookie);
747 return 5;
750 static int add_pong(struct sk_buff *skb, struct control_retrans *cr,
751 struct control_msg_out *cm, __u32 spaceleft,
752 ktime_t packetgen_start)
754 __s64 respdelay;
755 char *dst;
757 if (unlikely(spaceleft < 9))
758 return 0;
760 respdelay = div_u64(ktime_to_ns(packetgen_start) -
761 ktime_to_ns(cm->msg.pong.time_enqueued) + 500, 1000);
762 if (unlikely(respdelay > U32_MAX))
763 respdelay = U32_MAX;
764 if (unlikely(respdelay < 0))
765 respdelay = 0;
767 dst = skb_put(skb, 9);
768 BUG_ON(dst == 0);
770 dst[0] = KP_PONG;
771 put_u32(dst + 1, cm->msg.pong.cookie);
772 put_u32(dst + 5, (__u32) respdelay);
774 list_add_tail(&(cm->lh.list), &(cr->msgs));
776 return 9;
779 static int add_connect(struct sk_buff *skb, struct control_retrans *cr,
780 struct control_msg_out *cm, __u32 spaceleft)
782 char *dst;
783 struct conn *src_in = cm->msg.connect.src_in;
785 if (unlikely(spaceleft < 20))
786 return 0;
788 dst = skb_put(skb, 20);
789 BUG_ON(dst == 0);
791 dst[0] = KP_CONNECT;
792 put_u32(dst + 1, cm->msg.connect.conn_id);
793 put_u48(dst + 5, cm->msg.connect.seqno1);
794 put_u48(dst + 11, cm->msg.connect.seqno2);
795 BUG_ON(cm->msg.connect.src_in == 0);
796 dst[17] = get_window(cm->msg.connect.src_in, cm->nb,
797 cm->msg.connect.conn_id);
799 spin_lock_bh(&(src_in->reversedir->rcv_lock));
800 BUG_ON(src_in->reversedir->targettype != TARGET_OUT);
802 dst[18] = src_in->reversedir->target.out.priority_seqno;
803 dst[19] = src_in->reversedir->target.out.priority_last;
805 spin_unlock_bh(&(src_in->reversedir->rcv_lock));
807 list_add_tail(&(cm->lh.list), &(cr->msgs));
809 return 20;
812 static int add_connect_success(struct sk_buff *skb, struct control_retrans *cr,
813 struct control_msg_out *cm, __u32 spaceleft)
815 char *dst;
817 if (unlikely(spaceleft < 6))
818 return 0;
820 dst = skb_put(skb, 6);
821 BUG_ON(dst == 0);
823 dst[0] = KP_CONNECT_SUCCESS;
824 put_u32(dst + 1, cm->msg.connect_success.conn_id);
825 BUG_ON(cm->msg.connect_success.src_in == 0);
826 dst[5] = get_window(cm->msg.connect_success.src_in, cm->nb,
827 cm->msg.connect_success.conn_id);
829 list_add_tail(&(cm->lh.list), &(cr->msgs));
831 return 6;
834 static int add_reset_conn(struct sk_buff *skb, struct control_retrans *cr,
835 struct control_msg_out *cm, __u32 spaceleft)
837 char *dst;
839 if (unlikely(spaceleft < 5))
840 return 0;
842 dst = skb_put(skb, 5);
843 BUG_ON(dst == 0);
845 dst[0] = KP_RESET_CONN;
846 put_u32(dst + 1, cm->msg.reset_conn.conn_id);
848 list_add_tail(&(cm->lh.list), &(cr->msgs));
850 return 5;
853 static int add_conndata(struct sk_buff *skb, struct control_retrans *cr,
854 struct control_msg_out *cm, __u32 spaceleft,
855 struct control_msg_out **split_conndata, __u32 *sc_sendlen)
857 char *dst;
859 __u32 totallen = cm->msg.conn_data.datalen + 13;
860 __u32 putlen = min(totallen, spaceleft);
861 __u32 dataputlen = putlen - 13;
863 BUG_ON(putlen > 1024*1024*1024);
865 BUG_ON(split_conndata == 0);
866 BUG_ON(*split_conndata != 0);
867 BUG_ON(sc_sendlen == 0);
868 BUG_ON(*sc_sendlen != 0);
870 if (dataputlen == 0 || dataputlen > cm->msg.conn_data.datalen ||
871 (spaceleft < 25 && spaceleft < totallen))
872 return 0;
874 dst = skb_put(skb, putlen);
875 BUG_ON(dst == 0);
877 if (cm->msg.conn_data.snd_delayed_lowbuf == 0)
878 dst[0] = KP_CONN_DATA;
879 else
880 dst[0] = KP_CONN_DATA_LOWBUFDELAYED;
881 put_u32(dst + 1, cm->msg.conn_data.conn_id);
882 put_u48(dst + 5, cm->msg.conn_data.seqno);
883 put_u16(dst + 11, dataputlen);
885 memcpy(dst + 13, cm->msg.conn_data.data, dataputlen);
887 if (cm->msg.conn_data.datalen == dataputlen) {
888 list_add_tail(&(cm->lh.list), &(cr->msgs));
889 } else {
890 *split_conndata = cm;
891 *sc_sendlen = dataputlen;
894 return putlen;
897 static int add_set_max_cmsg_dly(struct sk_buff *skb, struct control_retrans *cr,
898 struct control_msg_out *cm, __u32 spaceleft)
900 char *dst;
902 if (unlikely(spaceleft < 13))
903 return 0;
905 dst = skb_put(skb, 13);
906 BUG_ON(dst == 0);
908 dst[0] = KP_SET_MAX_CMSG_DELAY;
909 put_u32(dst + 1, cm->msg.set_max_cmsg_delay.ack_delay);
910 put_u32(dst + 5, cm->msg.set_max_cmsg_delay.ackconn_delay);
911 put_u32(dst + 9, cm->msg.set_max_cmsg_delay.other_delay);
913 list_add_tail(&(cm->lh.list), &(cr->msgs));
915 return 13;
918 static int add_message(struct sk_buff *skb, struct control_retrans *cr,
919 struct control_msg_out *cm, __u32 spaceleft,
920 ktime_t packetgen_start,
921 struct control_msg_out **split_conndata, __u32 *sc_sendlen)
923 BUG_ON(split_conndata != 0 && *split_conndata != 0);
924 BUG_ON(sc_sendlen != 0 && *sc_sendlen != 0);
926 switch (cm->type) {
927 case MSGTYPE_ACK:
928 return add_ack(skb, cr, cm, spaceleft);
929 case MSGTYPE_ACK_CONN:
930 return add_ack_conn(skb, cr, cm, spaceleft);
931 case MSGTYPE_PONG:
932 return add_pong(skb, cr, cm, spaceleft, packetgen_start);
933 case MSGTYPE_CONNECT:
934 return add_connect(skb, cr, cm, spaceleft);
935 case MSGTYPE_CONNECT_SUCCESS:
936 return add_connect_success(skb, cr, cm, spaceleft);
937 case MSGTYPE_RESET_CONN:
938 return add_reset_conn(skb, cr, cm, spaceleft);
939 case MSGTYPE_CONNDATA:
940 return add_conndata(skb, cr, cm, spaceleft, split_conndata,
941 sc_sendlen);
942 case MSGTYPE_SET_MAX_CMSG_DELAY:
943 return add_set_max_cmsg_dly(skb, cr, cm, spaceleft);
944 default:
945 BUG();
947 BUG();
948 return 0;
951 static void requeue_message(struct control_msg_out *cm)
953 if (cm->type == MSGTYPE_ACK_CONN) {
954 struct conn *cn_l = cm->msg.ack_conn.src_in;
956 spin_lock_bh(&(cn_l->rcv_lock));
957 if (unlikely(ackconn_prepare_requeue(cn_l, cm) == 0)) {
958 free_control_msg(cm);
959 } else {
960 spin_lock_bh(&(cm->nb->cmsg_lock));
962 list_add(&(cm->lh.list), &(cm->nb->cmsg_queue_ackconn));
963 cm->nb->cmsg_otherlength += cm->length;
965 list_add(&(cm->msg.ack_conn.conn_acks),
966 &(cn_l->source.in.acks_pending));
967 try_merge_ackconns(cn_l, cm);
969 spin_unlock_bh(&(cm->nb->cmsg_lock));
971 spin_unlock_bh(&(cn_l->rcv_lock));
972 return;
975 enqueue_control_msg(cm, ADDCMSG_SRC_READD);
978 #define CMSGQUEUE_PONG 1
979 #define CMSGQUEUE_ACK 2
980 #define CMSGQUEUE_ACK_CONN 3
981 #define CMSGQUEUE_CONNDATA 4
982 #define CMSGQUEUE_OTHER 5
984 static unsigned long get_cmsg_timeout(struct control_msg_out *cm, int queue)
986 BUG_ON(cm->type == MSGTYPE_CONNDATA);
987 if (cm->type == MSGTYPE_ACK) {
988 BUG_ON(queue != CMSGQUEUE_ACK);
989 return cm->timing.time_added +
990 msecs_to_jiffies(CMSG_MAXDELAY_ACK_MS) - 1;
991 } else if (cm->type == MSGTYPE_ACK_CONN) {
992 BUG_ON(queue != CMSGQUEUE_ACK_CONN);
993 return cm->timing.time_added +
994 msecs_to_jiffies(CMSG_MAXDELAY_ACKCONN_MS) - 1;
995 } else {
996 BUG_ON(cm->type == MSGTYPE_PONG && queue != CMSGQUEUE_PONG);
997 BUG_ON(cm->type != MSGTYPE_PONG && queue != CMSGQUEUE_OTHER);
999 return cm->timing.time_added +
1000 msecs_to_jiffies(CMSG_MAXDELAY_OTHER_MS) - 1;
1004 static void _peek_message(struct neighbor *nb, int queue,
1005 struct control_msg_out **currcm, unsigned long *currtimeout,
1006 __u32 **currlen)
1008 struct control_msg_out *cm;
1009 unsigned long cmtimeout;
1011 if (queue == CMSGQUEUE_CONNDATA) {
1012 if (nb->cmsg_queue_conndata.top == 0)
1013 return;
1015 cm = container_of(nb->cmsg_queue_conndata.top,
1016 struct control_msg_out, lh.hpel);
1017 BUG_ON(cm->type != MSGTYPE_CONNDATA);
1018 cmtimeout = cm->timing.timeout;
1019 } else {
1020 struct list_head *queuelh;
1021 if (queue == CMSGQUEUE_PONG) {
1022 queuelh = &(nb->cmsg_queue_pong);
1023 } else if (queue == CMSGQUEUE_ACK) {
1024 queuelh = &(nb->cmsg_queue_ack);
1025 } else if (queue == CMSGQUEUE_ACK_CONN) {
1026 queuelh = &(nb->cmsg_queue_ackconn);
1027 } else if (queue == CMSGQUEUE_OTHER) {
1028 queuelh = &(nb->cmsg_queue_other);
1029 } else {
1030 BUG();
1033 if (list_empty(queuelh))
1034 return;
1036 cm = container_of(queuelh->next, struct control_msg_out,
1037 lh.list);
1038 cmtimeout = get_cmsg_timeout(cm, queue);
1041 BUG_ON(cm->nb != nb);
1043 if (*currcm == 0 || (time_before(cmtimeout, *currtimeout) &&
1044 time_before(jiffies, *currtimeout))) {
1045 *currcm = cm;
1046 *currtimeout = cmtimeout;
1048 if (queue == CMSGQUEUE_PONG) {
1049 *currlen = &(nb->cmsg_pongslength);
1050 } else {
1051 *currlen = &(nb->cmsg_otherlength);
1056 static void peek_message(struct neighbor *nb, int nbstate,
1057 struct control_msg_out **cm, unsigned long *cmtimeout,
1058 __u32 **len)
1060 _peek_message(nb, CMSGQUEUE_PONG, cm, cmtimeout, len);
1061 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE)) {
1062 _peek_message(nb, CMSGQUEUE_ACK, cm, cmtimeout, len);
1063 _peek_message(nb, CMSGQUEUE_ACK_CONN, cm, cmtimeout, len);
1064 _peek_message(nb, CMSGQUEUE_CONNDATA, cm, cmtimeout, len);
1065 _peek_message(nb, CMSGQUEUE_OTHER, cm, cmtimeout, len);
1069 static struct control_msg_out *dequeue_message(struct neighbor *nb,
1070 int nbstate)
1072 struct control_msg_out *cm = 0;
1073 unsigned long cmtimeout;
1074 __u32 *len;
1076 peek_message(nb, nbstate, &cm, &cmtimeout, &len);
1078 if (unlikely(cm == 0))
1079 return 0;
1081 BUG_ON(len == 0);
1083 *len -= cm->length;
1084 if (cm->type == MSGTYPE_CONNDATA) {
1085 cor_heap_remove(&conndata_heapdef, &(nb->cmsg_queue_conndata),
1086 &(cm->lh.hpel));
1087 } else {
1088 list_del(&(cm->lh.list));
1090 if (cm->type == MSGTYPE_ACK_CONN)
1091 list_del(&(cm->msg.ack_conn.conn_acks));
1092 if (unlikely(cm->type == MSGTYPE_PONG))
1093 atomic_dec(&(cm->nb->cmsg_pongscnt));
1095 if (unlikely(cm->type == MSGTYPE_RESET_CONN)) {
1096 BUG_ON(cm->msg.reset_conn.in_pending_conn_resets == 0);
1097 rb_erase(&(cm->msg.reset_conn.rbn),
1098 &(cm->nb->pending_conn_resets_rb));
1099 cm->msg.reset_conn.in_pending_conn_resets = 0;
1101 kref_put(&(cm->ref), kreffree_bug);
1105 return cm;
1108 static __u32 __send_messages(struct neighbor *nb, struct sk_buff *skb,
1109 struct control_retrans *cr, __u32 spaceleft, int nbstate,
1110 ktime_t packetgen_start,
1111 struct control_msg_out **split_conndata, __u32 *sc_sendlen)
1113 __u32 length = 0;
1114 while (length < spaceleft) {
1115 int rc;
1116 struct control_msg_out *cm;
1118 spin_lock_bh(&(nb->cmsg_lock));
1119 cm = dequeue_message(nb, nbstate);
1120 spin_unlock_bh(&(nb->cmsg_lock));
1122 if (cm == 0)
1123 break;
1125 rc = add_message(skb, cr, cm, spaceleft - length,
1126 packetgen_start, split_conndata, sc_sendlen);
1127 if (rc == 0) {
1128 requeue_message(cm);
1129 break;
1132 length += rc;
1135 return length;
1138 static __u32 __send_messages_smcd(struct neighbor *nb, struct sk_buff *skb,
1139 struct control_retrans *cr, __u32 spaceleft,
1140 ktime_t packetgen_start)
1142 struct control_msg_out *cm;
1143 int rc;
1145 cm = alloc_control_msg(nb, ACM_PRIORITY_MED);
1147 if (unlikely(cm == 0))
1148 return 0;
1150 cm->type = MSGTYPE_SET_MAX_CMSG_DELAY;
1151 cm->msg.set_max_cmsg_delay.ack_delay =
1152 CMSG_MAXDELAY_ACK_MS * 1000;
1153 cm->msg.set_max_cmsg_delay.ackconn_delay =
1154 CMSG_MAXDELAY_ACKCONN_MS * 1000;
1155 cm->msg.set_max_cmsg_delay.other_delay =
1156 CMSG_MAXDELAY_OTHER_MS * 1000;
1157 cm->length = 13;
1159 rc = add_message(skb, cr, cm, spaceleft, packetgen_start, 0, 0);
1161 nb->max_cmsg_delay_sent = 1;
1163 return rc;
1166 static int _send_messages(struct neighbor *nb, struct sk_buff *skb, int ping,
1167 int initsession, struct control_retrans *cr, __u32 spaceleft,
1168 int nbstate)
1170 int rc;
1171 __u32 length = 0;
1172 __u32 pinglen = 0;
1173 __u32 pingcookie = 0;
1174 unsigned long last_ping_time;
1175 struct control_msg_out *split_conndata = 0;
1176 __u32 sc_sendlen = 0;
1178 ktime_t packetgen_start = ktime_get();
1180 spin_lock_bh(&(nb->cmsg_lock));
1182 if (ping != TIMETOSENDPING_NO) {
1183 int rc;
1185 if (unlikely(initsession != 0)) {
1186 rc = add_init_session(skb, nb->sessionid,
1187 spaceleft - length);
1188 BUG_ON(rc <= 0);
1189 pinglen = rc;
1190 length += rc;
1193 pingcookie = add_ping_req(nb, &last_ping_time, packetgen_start);
1194 rc = add_ping(skb, pingcookie, spaceleft - length);
1195 BUG_ON(rc <= 0);
1196 pinglen = rc;
1197 length += rc;
1200 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE) &&
1201 unlikely(nb->max_cmsg_delay_sent == 0))
1202 length += __send_messages_smcd(nb, skb, cr, spaceleft - length,
1203 packetgen_start);
1205 spin_unlock_bh(&(nb->cmsg_lock));
1207 length += __send_messages(nb, skb, cr, spaceleft - length, nbstate,
1208 packetgen_start, &split_conndata, &sc_sendlen);
1210 BUG_ON(length > spaceleft);
1212 if (likely(ping != TIMETOSENDPING_FORCE) &&
1213 pinglen != 0 && unlikely(length == pinglen)) {
1214 unadd_ping_req(nb, pingcookie, last_ping_time, 0);
1215 goto drop;
1218 if (unlikely(length == 0)) {
1219 drop:
1220 kfree_skb(skb);
1222 BUG_ON(list_empty(&(cr->msgs)) == 0);
1223 kref_put(&(cr->ref), free_control_retrans);
1225 atomic64_sub(1, &(nb->kpacket_seqno));
1226 return 0;
1229 padding(skb, spaceleft - length);
1231 rc = cor_dev_queue_xmit(skb, QOS_CALLER_KPACKET);
1233 if (rc != 0) {
1234 if (ping != 0)
1235 unadd_ping_req(nb, pingcookie, last_ping_time, 1);
1237 atomic_inc(&(nb->cmsg_bulk_readds));
1238 if (split_conndata != 0)
1239 requeue_message(split_conndata);
1241 while (list_empty(&(cr->msgs)) == 0) {
1242 struct control_msg_out *cm = container_of(cr->msgs.prev,
1243 struct control_msg_out, lh.list);
1244 list_del(&(cm->lh.list));
1245 requeue_message(cm);
1248 kref_put(&(cr->ref), free_control_retrans);
1250 atomic_dec(&(nb->cmsg_bulk_readds));
1252 spin_lock_bh(&(nb->cmsg_lock));
1253 schedule_controlmsg_timer(nb);
1254 spin_unlock_bh(&(nb->cmsg_lock));
1255 } else {
1256 struct list_head *curr = cr->msgs.next;
1258 if (pingcookie != 0)
1259 ping_sent(nb, pingcookie);
1261 while(curr != &(cr->msgs)) {
1262 struct control_msg_out *cm = container_of(curr,
1263 struct control_msg_out, lh.list);
1265 curr = curr->next;
1267 if (unlikely(cm->type == MSGTYPE_PONG &&
1268 (nbstate != NEIGHBOR_STATE_ACTIVE))) {
1269 list_del(&(cm->lh.list));
1270 free_control_msg(cm);
1271 } else if (unlikely(cm->type == MSGTYPE_PONG &&
1272 atomic_inc_return(
1273 &(nb->cmsg_pongs_retrans_cnt)) >
1274 MAX_PONG_CMSGS_RETRANS_PER_NEIGH)) {
1275 atomic_dec(&(nb->cmsg_pongs_retrans_cnt));
1276 list_del(&(cm->lh.list));
1277 free_control_msg(cm);
1278 } else if (cm->type == MSGTYPE_CONNDATA) {
1279 schedule_retransmit_conn(cm->msg.conn_data.cr,
1281 kfree(cm->msg.conn_data.data_orig);
1282 list_del(&(cm->lh.list));
1283 free_control_msg(cm);
1287 if (split_conndata != 0) {
1288 BUG_ON(sc_sendlen == 0);
1289 BUG_ON(sc_sendlen >=
1290 split_conndata->msg.conn_data.datalen);
1292 split_conndata->msg.conn_data.seqno += sc_sendlen;
1293 split_conndata->msg.conn_data.data += sc_sendlen;
1294 split_conndata->msg.conn_data.datalen -= sc_sendlen;
1296 #warning todo requeue instead of send_conndata
1297 send_conndata(split_conndata,
1298 split_conndata->msg.conn_data.conn_id,
1299 split_conndata->msg.conn_data.seqno,
1300 split_conndata->msg.conn_data.data_orig,
1301 split_conndata->msg.conn_data.data,
1302 split_conndata->msg.conn_data.datalen,
1303 split_conndata->msg.conn_data.snd_delayed_lowbuf,
1305 split_conndata->msg.conn_data.cr,
1310 if (list_empty(&(cr->msgs)))
1311 kref_put(&(cr->ref), free_control_retrans);
1312 else
1313 schedule_retransmit(cr, nb);
1316 return rc;
1319 static __u32 get_total_messages_length(struct neighbor *nb, int ping,
1320 int initsession, int nbstate)
1322 __u32 length = nb->cmsg_pongslength;
1324 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE)) {
1325 length += nb->cmsg_otherlength;
1327 if (unlikely(nb->max_cmsg_delay_sent == 0))
1328 length += 13;
1330 if (ping == TIMETOSENDPING_FORCE ||
1331 (length > 0 && ping != TIMETOSENDPING_NO)) {
1332 length += 5;
1334 if (unlikely(initsession != 0))
1335 length += 5;
1338 return length;
1341 static void delete_all_cmsgs(struct neighbor *nb)
1343 while (1) {
1344 struct control_msg_out *cm;
1346 spin_lock_bh(&(nb->cmsg_lock));
1347 cm = dequeue_message(nb, NEIGHBOR_STATE_ACTIVE);
1348 spin_unlock_bh(&(nb->cmsg_lock));
1350 if (cm == 0)
1351 break;
1353 if (cm->type == MSGTYPE_CONNDATA) {
1354 schedule_retransmit_conn(cm->msg.conn_data.cr, 0);
1355 kfree(cm->msg.conn_data.data_orig);
1358 free_control_msg(cm);
1362 static int reset_timeouted_conn_needed(struct neighbor *nb,
1363 struct conn *src_in_l)
1365 if (unlikely(src_in_l->sourcetype != SOURCE_IN ||
1366 src_in_l->source.in.nb != nb ||
1367 src_in_l->isreset != 0))
1368 return 0;
1369 else if (likely(time_after(src_in_l->source.in.jiffies_last_act +
1370 CONN_ACTIVITY_UPDATEINTERVAL_SEC * HZ +
1371 CONN_INACTIVITY_TIMEOUT_SEC * HZ, jiffies)))
1372 return 0;
1374 return 1;
1377 static int reset_timeouted_conn(struct neighbor *nb, struct conn *src_in)
1379 int resetted = 0;
1381 if (src_in->is_client) {
1382 spin_lock_bh(&(src_in->rcv_lock));
1383 spin_lock_bh(&(src_in->reversedir->rcv_lock));
1384 } else {
1385 spin_lock_bh(&(src_in->reversedir->rcv_lock));
1386 spin_lock_bh(&(src_in->rcv_lock));
1389 resetted = reset_timeouted_conn_needed(nb, src_in);
1390 if (unlikely(resetted == 0))
1391 goto unlock;
1393 resetted = (send_reset_conn(nb, src_in->reversedir->target.out.conn_id,
1394 1) == 0);
1395 if (unlikely(resetted == 0))
1396 goto unlock;
1399 BUG_ON(src_in->reversedir->isreset != 0);
1400 src_in->reversedir->isreset = 1;
1402 unlock:
1403 if (src_in->is_client) {
1404 spin_unlock_bh(&(src_in->rcv_lock));
1405 spin_unlock_bh(&(src_in->reversedir->rcv_lock));
1406 } else {
1407 spin_unlock_bh(&(src_in->reversedir->rcv_lock));
1408 spin_unlock_bh(&(src_in->rcv_lock));
1411 if (resetted)
1412 reset_conn(src_in);
1414 return resetted;
1417 static void reset_timeouted_conns(struct neighbor *nb)
1419 int i;
1420 for (i=0;i<10000;i++) {
1421 unsigned long iflags;
1422 struct list_head *lh;
1423 struct conn *src_in;
1425 int resetted = 1;
1427 spin_lock_irqsave(&(nb->conn_list_lock), iflags);
1429 if (list_empty(&(nb->rcv_conn_list))) {
1430 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
1431 break;
1434 lh = nb->rcv_conn_list.next;
1435 list_del(lh);
1436 list_add_tail(lh, &(nb->rcv_conn_list));
1438 src_in = container_of(lh, struct conn, source.in.nb_list);
1439 kref_get(&(src_in->ref));
1441 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
1444 spin_lock_bh(&(src_in->rcv_lock));
1445 BUG_ON(src_in->sourcetype != SOURCE_IN);
1446 BUG_ON(src_in->source.in.nb != nb);
1447 resetted = reset_timeouted_conn_needed(nb, src_in);
1448 spin_unlock_bh(&(src_in->rcv_lock));
1449 if (likely(resetted == 0))
1450 goto put;
1452 resetted = reset_timeouted_conn(nb, src_in);
1454 put:
1455 kref_put(&(src_in->ref), free_conn);
1457 if (likely(resetted == 0))
1458 break;
1462 int send_messages(struct neighbor *nb, int resume)
1464 int i;
1465 int sent = 0;
1466 int rc = 0;
1467 int ping;
1468 int initsession = 0;
1469 int targetmss = mss_cmsg(nb);
1471 int nbstate = get_neigh_state(nb);
1473 if (likely(nbstate == NEIGHBOR_STATE_ACTIVE))
1474 reset_timeouted_conns(nb);
1476 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
1477 delete_all_cmsgs(nb);
1478 atomic_set(&(nb->cmsg_task_scheduled), 0);
1479 goto out;
1482 spin_lock_bh(&(nb->send_cmsg_lock));
1483 spin_lock_bh(&(nb->cmsg_lock));
1485 ping = time_to_send_ping(nb);
1487 if (unlikely(atomic_read(&(nb->sessionid_snd_needed)) != 0))
1488 initsession = 1;
1490 for (i=0;1;i++) {
1491 __u32 length;
1493 __u64 seqno;
1494 struct sk_buff *skb;
1495 struct control_retrans *cr;
1497 int cmsgqueue_nonpong_empty = (
1498 list_empty(&(nb->cmsg_queue_ack)) &&
1499 list_empty(&(nb->cmsg_queue_ackconn)) &&
1500 (nb->cmsg_queue_conndata.top == 0) &&
1501 list_empty(&(nb->cmsg_queue_other)));
1503 BUG_ON(list_empty(&(nb->cmsg_queue_pong)) &&
1504 (nb->cmsg_pongslength != 0));
1505 BUG_ON((list_empty(&(nb->cmsg_queue_pong)) == 0) &&
1506 (nb->cmsg_pongslength == 0));
1507 BUG_ON(cmsgqueue_nonpong_empty &&
1508 (nb->cmsg_otherlength != 0));
1509 BUG_ON((cmsgqueue_nonpong_empty == 0) &&
1510 (nb->cmsg_otherlength == 0));
1512 length = get_total_messages_length(nb, ping, initsession,
1513 nbstate);
1515 if (length == 0)
1516 break;
1518 if (length < targetmss && i > 0)
1519 break;
1521 #warning todo why atomic?
1522 seqno = atomic64_add_return(1, &(nb->kpacket_seqno));
1524 if (length > targetmss)
1525 length = targetmss;
1527 spin_unlock_bh(&(nb->cmsg_lock));
1528 skb = create_packet_cmsg(nb, length, GFP_ATOMIC, seqno);
1529 if (unlikely(skb == 0)) {
1530 printk(KERN_ERR "cor: send_messages: cannot allocate "
1531 "skb (out of memory?)");
1532 goto oom;
1535 cr = kmem_cache_alloc(controlretrans_slab, GFP_ATOMIC);
1536 if (unlikely(cr == 0)) {
1537 kfree_skb(skb);
1538 printk(KERN_ERR "cor: send_messages: cannot allocate "
1539 "control_retrans (out of memory?)");
1540 goto oom;
1542 memset(cr, 0, sizeof(struct control_retrans));
1543 kref_init(&(cr->ref));
1544 cr->nb = nb;
1545 cr->seqno = seqno;
1546 INIT_LIST_HEAD(&(cr->msgs));
1548 rc = _send_messages(nb, skb, ping, initsession, cr, length,
1549 nbstate);
1550 ping = 0;
1552 spin_lock_bh(&(nb->cmsg_lock));
1554 if (rc != 0)
1555 break;
1557 sent = 1;
1560 if (0) {
1561 oom:
1562 spin_lock_bh(&(nb->cmsg_lock));
1565 if (rc != 0) {
1566 if (resume == 0)
1567 qos_enqueue(nb->queue, &(nb->rb_kp),
1568 QOS_CALLER_KPACKET);
1569 } else {
1570 atomic_set(&(nb->cmsg_task_scheduled), 0);
1571 barrier();
1572 schedule_controlmsg_timer(nb);
1575 spin_unlock_bh(&(nb->cmsg_lock));
1576 spin_unlock_bh(&(nb->send_cmsg_lock));
1578 out:
1579 if (rc == 0)
1580 kref_put(&(nb->ref), neighbor_free);
1582 if (rc != 0)
1583 return sent ? QOS_RESUME_CONG : QOS_RESUME_CONG_NOPROGRESS;
1584 return QOS_RESUME_DONE;
1587 void controlmsg_taskfunc(unsigned long nb)
1589 send_messages((struct neighbor *)nb, 0);
1592 static void schedule_cmsg_task(struct neighbor *nb)
1594 if (atomic_cmpxchg(&(nb->cmsg_task_scheduled), 0, 1) == 0) {
1595 barrier();
1596 kref_get(&(nb->ref));
1597 tasklet_schedule(&(nb->cmsg_task));
1601 void controlmsg_timerfunc(struct timer_list *cmsg_timer)
1603 struct neighbor *nb = container_of(cmsg_timer,
1604 struct neighbor, cmsg_timer);
1605 atomic_set(&(nb->cmsg_timer_running), 0);
1606 barrier();
1607 schedule_cmsg_task(nb);
1608 kref_put(&(nb->ref), neighbor_free);
1611 static unsigned long get_cmsg_timer_timeout(struct neighbor *nb, int nbstate)
1613 unsigned long pingtimeout = get_next_ping_time(nb);
1615 struct control_msg_out *cm = 0;
1616 unsigned long cmtimeout;
1617 __u32 *len;
1619 peek_message(nb, nbstate, &cm, &cmtimeout, &len);
1621 if (cm != 0) {
1622 unsigned long jiffies_tmp = jiffies;
1624 if (unlikely(nbstate != NEIGHBOR_STATE_ACTIVE))
1625 return pingtimeout;
1626 if (time_before(cmtimeout, jiffies_tmp))
1627 return jiffies_tmp;
1628 if (CMSG_DYNAMIC_DELAY != 0 && time_before_eq(cmtimeout -
1629 usecs_to_jiffies(nb->cmsg_interval/2),
1630 jiffies_tmp))
1631 return jiffies_tmp;
1632 if (time_before(cmtimeout, pingtimeout))
1633 return cmtimeout;
1636 return pingtimeout;
1639 static int cmsg_full_packet(struct neighbor *nb, int nbstate)
1641 int ping = time_to_send_ping(nb);
1642 int initsession = unlikely(atomic_read(&(nb->sessionid_snd_needed)) !=
1643 0) ? 1 : 0;
1644 __u32 len = get_total_messages_length(nb, ping, initsession, nbstate);
1646 if (len == 0)
1647 return 0;
1648 if (len < mss_cmsg(nb))
1649 return 0;
1651 return 1;
1654 void schedule_controlmsg_timer(struct neighbor *nb_cmsglocked)
1656 unsigned long timeout;
1657 int state = get_neigh_state(nb_cmsglocked);
1659 if (unlikely(state == NEIGHBOR_STATE_KILLED))
1660 goto now;
1662 if (atomic_read(&(nb_cmsglocked->cmsg_bulk_readds)) != 0)
1663 return;
1665 if (unlikely(atomic_read(&(nb_cmsglocked->cmsg_task_scheduled)) == 1))
1666 return;
1668 if (cmsg_full_packet(nb_cmsglocked, state))
1669 goto now;
1671 timeout = get_cmsg_timer_timeout(nb_cmsglocked, state);
1673 if (time_before_eq(timeout, jiffies)) {
1674 now:
1675 schedule_cmsg_task(nb_cmsglocked);
1676 } else {
1677 if (atomic_xchg(&(nb_cmsglocked->cmsg_timer_running), 1) == 0)
1678 kref_get(&(nb_cmsglocked->ref));
1679 barrier();
1680 mod_timer(&(nb_cmsglocked->cmsg_timer), timeout);
1684 static void update_cmsg_interval(struct neighbor *nb_cmsglocked)
1686 unsigned long jiffies_tmp = jiffies;
1688 __u64 newinterval = (((__u64) nb_cmsglocked->cmsg_interval) * 255 +
1689 jiffies_to_usecs(jiffies_tmp -
1690 nb_cmsglocked->jiffies_last_cmsg)) / 256;
1692 nb_cmsglocked->jiffies_last_cmsg = jiffies_tmp;
1694 if (unlikely(newinterval > CMSG_INTERVAL_MAX_US))
1695 newinterval = CMSG_INTERVAL_MAX_US;
1696 if (unlikely(newinterval > U32_MAX))
1697 newinterval = U32_MAX;
1699 nb_cmsglocked->cmsg_interval = newinterval;
1702 static int insert_pending_conn_resets(struct control_msg_out *ins)
1704 struct neighbor *nb = ins->nb;
1705 __u32 conn_id = ins->msg.reset_conn.conn_id;
1707 struct rb_root *root;
1708 struct rb_node **p;
1709 struct rb_node *parent = 0;
1711 BUG_ON(nb == 0);
1712 BUG_ON(ins->msg.reset_conn.in_pending_conn_resets != 0);
1714 root = &(nb->pending_conn_resets_rb);
1715 p = &(root->rb_node);
1717 while ((*p) != 0) {
1718 struct control_msg_out *cm = container_of(*p,
1719 struct control_msg_out,
1720 msg.reset_conn.rbn);
1721 __u32 cm_connid = cm->msg.reset_conn.conn_id;
1723 BUG_ON(cm->nb != ins->nb);
1724 BUG_ON(cm->type != MSGTYPE_RESET_CONN);
1726 parent = *p;
1727 if (conn_id == cm_connid) {
1728 return 1;
1729 } else if (conn_id < cm_connid) {
1730 p = &(*p)->rb_left;
1731 } else if (conn_id > cm_connid) {
1732 p = &(*p)->rb_right;
1733 } else {
1734 BUG();
1738 kref_get(&(ins->ref));
1739 rb_link_node(&(ins->msg.reset_conn.rbn), parent, p);
1740 rb_insert_color(&(ins->msg.reset_conn.rbn), root);
1741 ins->msg.reset_conn.in_pending_conn_resets = 1;
1743 return 0;
1746 static void free_oldest_pong(struct neighbor *nb)
1748 struct control_msg_out *cm = container_of(nb->cmsg_queue_pong.next,
1749 struct control_msg_out, lh.list);
1751 BUG_ON(list_empty(&(nb->cmsg_queue_pong)));
1752 BUG_ON(unlikely(cm->type != MSGTYPE_PONG));
1754 list_del(&(cm->lh.list));
1755 nb->cmsg_pongslength -= cm->length;
1756 atomic_dec(&(nb->cmsg_pongscnt));
1757 free_control_msg(cm);
1760 static int _enqueue_control_msg(struct control_msg_out *cm, int src)
1762 if (unlikely(cm->type == MSGTYPE_PONG)) {
1763 long msgs;
1765 BUG_ON(src == ADDCMSG_SRC_SPLITCONNDATA);
1767 msgs = atomic_inc_return(&(cm->nb->cmsg_pongscnt));
1768 BUG_ON(msgs <= 0);
1770 if (src != ADDCMSG_SRC_NEW) {
1771 if (msgs > MAX_PONG_CMSGS_PER_NEIGH) {
1772 atomic_dec(&(cm->nb->cmsg_pongscnt));
1773 free_control_msg(cm);
1774 return 1;
1777 cm->nb->cmsg_pongslength += cm->length;
1778 list_add(&(cm->lh.list), &(cm->nb->cmsg_queue_pong));
1779 } else {
1780 if (msgs > MAX_PONG_CMSGS_PER_NEIGH)
1781 free_oldest_pong(cm->nb);
1783 cm->nb->cmsg_pongslength += cm->length;
1784 list_add_tail(&(cm->lh.list),
1785 &(cm->nb->cmsg_queue_pong));
1788 return 0;
1789 } else if (unlikely(cm->type == MSGTYPE_RESET_CONN)) {
1790 if (insert_pending_conn_resets(cm) != 0) {
1791 cm->type = 0;
1792 free_control_msg(cm);
1793 return 1;
1797 cm->nb->cmsg_otherlength += cm->length;
1798 if (src == ADDCMSG_SRC_NEW) {
1799 if (cm->type == MSGTYPE_ACK) {
1800 list_add_tail(&(cm->lh.list),
1801 &(cm->nb->cmsg_queue_ack));
1802 } else if (cm->type == MSGTYPE_ACK_CONN) {
1803 list_add_tail(&(cm->lh.list),
1804 &(cm->nb->cmsg_queue_ackconn));
1805 } else if (cm->type == MSGTYPE_CONNDATA) {
1806 cor_heap_insert(&conndata_heapdef,
1807 &(cm->nb->cmsg_queue_conndata),
1808 &(cm->lh.hpel));
1809 } else {
1810 list_add_tail(&(cm->lh.list),
1811 &(cm->nb->cmsg_queue_other));
1813 } else {
1814 BUG_ON(src == ADDCMSG_SRC_SPLITCONNDATA &&
1815 cm->type != MSGTYPE_CONNDATA);
1816 BUG_ON(src == ADDCMSG_SRC_READD &&
1817 cm->type == MSGTYPE_ACK_CONN);
1819 if (cm->type == MSGTYPE_ACK) {
1820 list_add(&(cm->lh.list), &(cm->nb->cmsg_queue_ack));
1821 } else if (cm->type == MSGTYPE_ACK_CONN) {
1822 list_add(&(cm->lh.list), &(cm->nb->cmsg_queue_ackconn));
1823 } else if (cm->type == MSGTYPE_CONNDATA) {
1824 cor_heap_insert(&conndata_heapdef,
1825 &(cm->nb->cmsg_queue_conndata),
1826 &(cm->lh.hpel));
1827 } else {
1828 list_add(&(cm->lh.list), &(cm->nb->cmsg_queue_other));
1832 return 0;
1835 static void enqueue_control_msg(struct control_msg_out *cm, int src)
1837 BUG_ON(cm == 0);
1838 BUG_ON(cm->nb == 0);
1840 if (src == ADDCMSG_SRC_NEW) {
1841 if (cm->type == MSGTYPE_CONNDATA)
1842 cm->timing.timeout = jiffies +
1843 cm->msg.conn_data.max_delay_hz;
1844 else
1845 cm->timing.time_added = jiffies;
1848 spin_lock_bh(&(cm->nb->cmsg_lock));
1850 if (_enqueue_control_msg(cm, src) != 0)
1851 goto out;
1853 if (src == ADDCMSG_SRC_NEW || src == ADDCMSG_SRC_RETRANS)
1854 update_cmsg_interval(cm->nb);
1856 if (src != ADDCMSG_SRC_READD)
1857 schedule_controlmsg_timer(cm->nb);
1859 out:
1860 spin_unlock_bh(&(cm->nb->cmsg_lock));
1864 void send_pong(struct neighbor *nb, __u32 cookie)
1866 struct control_msg_out *cm = _alloc_control_msg(nb);
1868 if (unlikely(cm == 0))
1869 return;
1871 cm->nb = nb;
1872 cm->type = MSGTYPE_PONG;
1873 cm->msg.pong.cookie = cookie;
1874 cm->msg.pong.type = MSGTYPE_PONG_TIMEENQUEUED;
1875 cm->msg.pong.time_enqueued = ktime_get();
1876 cm->length = 9;
1877 enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
1880 void send_ack(struct neighbor *nb, __u64 seqno)
1882 struct control_msg_out *cm = alloc_control_msg(nb, ACM_PRIORITY_HIGH);
1884 if (unlikely(cm == 0))
1885 return;
1887 cm->nb = nb;
1888 cm->type = MSGTYPE_ACK;
1889 cm->msg.ack.seqno = seqno;
1890 cm->length = 7;
1891 enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
1894 static void set_ooolen_flags(struct control_msg_out *cm)
1896 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags &
1897 (~KP_ACK_CONN_FLAGS_OOO));
1898 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
1899 ooolen_to_flags(cm->msg.ack_conn.length));
1902 /* cmsg_lock must be held */
1903 static void remove_pending_ackconn(struct control_msg_out *cm)
1905 cm->nb->cmsg_otherlength -= cm->length;
1906 list_del(&(cm->lh.list));
1908 list_del(&(cm->msg.ack_conn.conn_acks));
1909 kref_put(&(cm->msg.ack_conn.src_in->ref), free_conn);
1910 cm->msg.ack_conn.src_in = 0;
1912 cm->type = 0;
1913 free_control_msg(cm);
1916 /* cmsg_lock must be held */
1917 static void recalc_scheduled_ackconn_size(struct control_msg_out *cm)
1919 cm->nb->cmsg_otherlength -= cm->length;
1920 cm->length = 6 + ack_conn_len(cm->msg.ack_conn.flags);
1921 cm->nb->cmsg_otherlength += cm->length;
1924 /* cmsg_lock must be held */
1925 static int _try_merge_ackconn(struct conn *src_in_l,
1926 struct control_msg_out *fromcm, struct control_msg_out *tocm,
1927 int from_newack)
1929 if (ooolen(fromcm->msg.ack_conn.flags) != 0 &&
1930 ooolen(tocm->msg.ack_conn.flags) != 0) {
1931 __u64 tocmseqno = tocm->msg.ack_conn.seqno_ooo;
1932 __u64 tocmlength = tocm->msg.ack_conn.length;
1933 __u64 fromcmseqno = fromcm->msg.ack_conn.seqno_ooo;
1934 __u64 fromcmlength = fromcm->msg.ack_conn.length;
1936 if (seqno_eq(tocmseqno, fromcmseqno)) {
1937 if (fromcmlength > tocmlength)
1938 tocm->msg.ack_conn.length = fromcmlength;
1939 } else if (seqno_after(fromcmseqno, tocmseqno) &&
1940 seqno_before_eq(fromcmseqno, tocmseqno +
1941 tocmlength)) {
1942 __u64 len = seqno_clean(fromcmseqno + fromcmlength -
1943 tocmseqno);
1944 BUG_ON(len > U32_MAX);
1945 tocm->msg.ack_conn.length = (__u32) len;
1946 } else if (seqno_before(fromcmseqno, tocmseqno) &&
1947 seqno_after_eq(fromcmseqno, tocmseqno)) {
1948 __u64 len = seqno_clean(tocmseqno + tocmlength -
1949 fromcmseqno);
1950 BUG_ON(len > U32_MAX);
1951 tocm->msg.ack_conn.seqno_ooo = fromcmseqno;
1952 tocm->msg.ack_conn.length = (__u32) len;
1953 } else {
1954 return 1;
1956 set_ooolen_flags(tocm);
1959 if ((fromcm->msg.ack_conn.flags &
1960 KP_ACK_CONN_FLAGS_SEQNO) != 0) {
1961 if ((tocm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_SEQNO) == 0)
1962 goto setseqno;
1964 BUG_ON(seqno_eq(fromcm->msg.ack_conn.ack_seqno,
1965 tocm->msg.ack_conn.ack_seqno));
1966 if (seqno_after_eq(tocm->msg.ack_conn.ack_seqno,
1967 fromcm->msg.ack_conn.ack_seqno)) {
1968 BUG_ON(seqno_after(fromcm->msg.ack_conn.seqno,
1969 tocm->msg.ack_conn.seqno));
1970 goto skipseqno;
1973 BUG_ON(seqno_before(fromcm->msg.ack_conn.seqno,
1974 tocm->msg.ack_conn.seqno));
1976 setseqno:
1977 tocm->msg.ack_conn.flags = (tocm->msg.ack_conn.flags |
1978 KP_ACK_CONN_FLAGS_SEQNO);
1979 tocm->msg.ack_conn.seqno = fromcm->msg.ack_conn.seqno;
1980 tocm->msg.ack_conn.ack_seqno = fromcm->msg.ack_conn.ack_seqno;
1982 skipseqno:
1983 if ((fromcm->msg.ack_conn.flags &
1984 KP_ACK_CONN_FLAGS_WINDOW) != 0)
1985 tocm->msg.ack_conn.flags = (tocm->msg.ack_conn.flags |
1986 KP_ACK_CONN_FLAGS_WINDOW);
1990 if (ooolen(fromcm->msg.ack_conn.flags) != 0) {
1991 tocm->msg.ack_conn.seqno_ooo = fromcm->msg.ack_conn.seqno_ooo;
1992 tocm->msg.ack_conn.length = fromcm->msg.ack_conn.length;
1993 set_ooolen_flags(tocm);
1996 if ((fromcm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) != 0) {
1997 BUG_ON((tocm->msg.ack_conn.flags &
1998 KP_ACK_CONN_FLAGS_PRIORITY) != 0);
1999 tocm->msg.ack_conn.priority_seqno =
2000 fromcm->msg.ack_conn.priority_seqno;
2001 tocm->msg.ack_conn.priority = fromcm->msg.ack_conn.priority;
2004 recalc_scheduled_ackconn_size(tocm);
2005 if (from_newack)
2006 kref_put(&(fromcm->msg.ack_conn.src_in->ref), free_conn);
2007 else
2008 remove_pending_ackconn(fromcm);
2010 return 0;
2013 /* cmsg_lock must be held */
2014 static void try_merge_ackconns(struct conn *src_in_l,
2015 struct control_msg_out *cm)
2017 struct list_head *currlh = cm->msg.ack_conn.conn_acks.next;
2019 while (currlh != &(src_in_l->source.in.acks_pending)) {
2020 struct control_msg_out *currcm = container_of(currlh,
2021 struct control_msg_out,
2022 msg.ack_conn.conn_acks);
2023 currlh = currlh->next;
2024 remove_connack_oooflag_ifold(src_in_l, currcm);
2025 _try_merge_ackconn(src_in_l, currcm, cm, 0);
2029 static void merge_or_enqueue_ackconn(struct conn *src_in_l,
2030 struct control_msg_out *cm, int src)
2032 struct list_head *currlh;
2034 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
2036 spin_lock_bh(&(cm->nb->cmsg_lock));
2038 currlh = src_in_l->source.in.acks_pending.next;
2040 while (currlh != &(src_in_l->source.in.acks_pending)) {
2041 struct control_msg_out *currcm = container_of(currlh,
2042 struct control_msg_out,
2043 msg.ack_conn.conn_acks);
2045 BUG_ON(currcm->nb != cm->nb);
2046 BUG_ON(currcm->type != MSGTYPE_ACK_CONN);
2047 BUG_ON(cm->msg.ack_conn.src_in != src_in_l);
2048 BUG_ON(currcm->msg.ack_conn.conn_id !=
2049 cm->msg.ack_conn.conn_id);
2051 if (_try_merge_ackconn(src_in_l, cm, currcm, 1) == 0) {
2052 try_merge_ackconns(src_in_l, currcm);
2053 update_cmsg_interval(currcm->nb);
2054 schedule_controlmsg_timer(currcm->nb);
2055 spin_unlock_bh(&(currcm->nb->cmsg_lock));
2056 return;
2059 currlh = currlh->next;
2062 list_add_tail(&(cm->msg.ack_conn.conn_acks),
2063 &(src_in_l->source.in.acks_pending));
2065 spin_unlock_bh(&(cm->nb->cmsg_lock));
2067 enqueue_control_msg(cm, src);
2070 static int try_update_ackconn_seqno(struct conn *src_in_l)
2072 int rc = 1;
2074 spin_lock_bh(&(src_in_l->source.in.nb->cmsg_lock));
2076 if (list_empty(&(src_in_l->source.in.acks_pending)) == 0) {
2077 struct control_msg_out *cm = container_of(
2078 src_in_l->source.in.acks_pending.next,
2079 struct control_msg_out,
2080 msg.ack_conn.conn_acks);
2081 BUG_ON(cm->nb != src_in_l->source.in.nb);
2082 BUG_ON(cm->type != MSGTYPE_ACK_CONN);
2083 BUG_ON(cm->msg.ack_conn.src_in != src_in_l);
2084 BUG_ON(cm->msg.ack_conn.conn_id !=
2085 src_in_l->reversedir->target.out.conn_id);
2087 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
2088 KP_ACK_CONN_FLAGS_SEQNO |
2089 KP_ACK_CONN_FLAGS_WINDOW);
2090 cm->msg.ack_conn.seqno = src_in_l->source.in.next_seqno;
2092 src_in_l->source.in.ack_seqno++;
2093 cm->msg.ack_conn.ack_seqno = src_in_l->source.in.ack_seqno;
2095 remove_connack_oooflag_ifold(src_in_l, cm);
2096 recalc_scheduled_ackconn_size(cm);
2098 try_merge_ackconns(src_in_l, cm);
2100 rc = 0;
2103 spin_unlock_bh(&(src_in_l->source.in.nb->cmsg_lock));
2105 return rc;
2108 void send_ack_conn_ifneeded(struct conn *src_in_l, __u64 seqno_ooo,
2109 __u32 ooo_length)
2111 struct control_msg_out *cm;
2113 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
2115 BUG_ON(ooo_length > 0 && seqno_before_eq(seqno_ooo,
2116 src_in_l->source.in.next_seqno));
2118 update_windowlimit(src_in_l);
2120 if (ooo_length != 0) {
2121 cm = alloc_control_msg(src_in_l->source.in.nb,
2122 ACM_PRIORITY_LOW);
2123 if (cm != 0)
2124 goto add;
2127 if (src_in_l->source.in.inorder_ack_needed == 0 &&
2128 seqno_clean(src_in_l->source.in.window_seqnolimit -
2129 src_in_l->source.in.next_seqno)/2 <
2130 seqno_clean(
2131 src_in_l->source.in.window_seqnolimit_remote -
2132 src_in_l->source.in.next_seqno))
2133 return;
2135 if (try_update_ackconn_seqno(src_in_l) == 0)
2136 goto out;
2138 cm = alloc_control_msg(src_in_l->source.in.nb, ACM_PRIORITY_MED);
2139 if (cm == 0) {
2140 printk(KERN_ERR "error allocating inorder ack");
2141 return;
2144 add:
2145 cm->type = MSGTYPE_ACK_CONN;
2146 src_in_l->source.in.ack_seqno++;
2147 cm->msg.ack_conn.ack_seqno = src_in_l->source.in.ack_seqno;
2148 kref_get(&(src_in_l->ref));
2149 cm->msg.ack_conn.src_in = src_in_l;
2150 cm->msg.ack_conn.conn_id = src_in_l->reversedir->target.out.conn_id;
2151 cm->msg.ack_conn.seqno = src_in_l->source.in.next_seqno;
2152 cm->msg.ack_conn.seqno_ooo = seqno_ooo;
2153 cm->msg.ack_conn.length = ooo_length;
2154 cm->msg.ack_conn.flags = KP_ACK_CONN_FLAGS_SEQNO |
2155 KP_ACK_CONN_FLAGS_WINDOW;
2156 set_ooolen_flags(cm);
2157 cm->length = 6 + ack_conn_len(cm->msg.ack_conn.flags);
2159 merge_or_enqueue_ackconn(src_in_l, cm, ADDCMSG_SRC_NEW);
2161 out:
2162 src_in_l->source.in.inorder_ack_needed = 0;
2163 src_in_l->source.in.window_seqnolimit_remote =
2164 src_in_l->source.in.window_seqnolimit;
2167 static int try_add_priority(struct conn *trgt_out_l, __u8 priority)
2169 int rc = 1;
2170 struct conn *src_in = trgt_out_l->reversedir;
2172 spin_lock_bh(&(trgt_out_l->target.out.nb->cmsg_lock));
2174 if (list_empty(&(src_in->source.in.acks_pending)) == 0) {
2175 struct control_msg_out *cm = container_of(
2176 src_in->source.in.acks_pending.next,
2177 struct control_msg_out,
2178 msg.ack_conn.conn_acks);
2179 BUG_ON(cm->nb != trgt_out_l->target.out.nb);
2180 BUG_ON(cm->type != MSGTYPE_ACK_CONN);
2181 BUG_ON(cm->msg.ack_conn.src_in != trgt_out_l->reversedir);
2182 BUG_ON(cm->msg.ack_conn.conn_id !=
2183 trgt_out_l->target.out.conn_id);
2185 BUG_ON((cm->msg.ack_conn.flags & KP_ACK_CONN_FLAGS_PRIORITY) !=
2187 cm->msg.ack_conn.flags = (cm->msg.ack_conn.flags |
2188 KP_ACK_CONN_FLAGS_PRIORITY);
2189 cm->msg.ack_conn.priority_seqno =
2190 trgt_out_l->target.out.priority_seqno;
2191 cm->msg.ack_conn.priority = priority;
2192 recalc_scheduled_ackconn_size(cm);
2194 rc = 0;
2197 spin_unlock_bh(&(trgt_out_l->target.out.nb->cmsg_lock));
2199 return rc;
2202 void send_priority(struct conn *trgt_out_ll, int force, __u8 priority)
2204 struct control_msg_out *cm;
2206 if (try_add_priority(trgt_out_ll, priority) == 0)
2207 goto out;
2209 if (force == 0)
2210 return;
2212 cm = alloc_control_msg(trgt_out_ll->target.out.nb, ACM_PRIORITY_LOW);
2214 if (cm == 0)
2215 return;
2217 cm->type = MSGTYPE_ACK_CONN;
2218 cm->msg.ack_conn.flags = KP_ACK_CONN_FLAGS_PRIORITY;
2219 kref_get(&(trgt_out_ll->reversedir->ref));
2220 BUG_ON(trgt_out_ll->targettype != TARGET_OUT);
2221 cm->msg.ack_conn.src_in = trgt_out_ll->reversedir;
2222 cm->msg.ack_conn.conn_id = trgt_out_ll->target.out.conn_id;
2223 cm->msg.ack_conn.priority_seqno =
2224 trgt_out_ll->target.out.priority_seqno;
2225 cm->msg.ack_conn.priority = priority;
2227 cm->length = 6 + ack_conn_len(cm->msg.ack_conn.flags);
2228 merge_or_enqueue_ackconn(trgt_out_ll->reversedir, cm, ADDCMSG_SRC_NEW);
2230 out:
2231 trgt_out_ll->target.out.priority_last = priority;
2232 trgt_out_ll->target.out.priority_seqno++;
2233 trgt_out_ll->target.out.priority_send_allowed = 0;
2236 void free_ack_conns(struct conn *src_in_l)
2238 int changed = 0;
2239 spin_lock_bh(&(src_in_l->source.in.nb->cmsg_lock));
2240 while (list_empty(&(src_in_l->source.in.acks_pending)) == 0) {
2241 struct list_head *currlh =
2242 src_in_l->source.in.acks_pending.next;
2243 struct control_msg_out *currcm = container_of(currlh,
2244 struct control_msg_out,
2245 msg.ack_conn.conn_acks);
2247 remove_pending_ackconn(currcm);
2248 changed = 1;
2250 if (changed)
2251 schedule_controlmsg_timer(src_in_l->source.in.nb);
2252 spin_unlock_bh(&(src_in_l->source.in.nb->cmsg_lock));
2255 void send_connect_success(struct control_msg_out *cm, __u32 conn_id,
2256 struct conn *src_in)
2258 cm->type = MSGTYPE_CONNECT_SUCCESS;
2259 cm->msg.connect_success.conn_id = conn_id;
2260 kref_get(&(src_in->ref));
2261 cm->msg.connect_success.src_in = src_in;
2262 cm->length = 6;
2263 enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2266 void send_connect_nb(struct control_msg_out *cm, __u32 conn_id, __u64 seqno1,
2267 __u64 seqno2, struct conn *src_in)
2269 cm->type = MSGTYPE_CONNECT;
2270 cm->msg.connect.conn_id = conn_id;
2271 cm->msg.connect.seqno1 = seqno1;
2272 cm->msg.connect.seqno2 = seqno2;
2273 kref_get(&(src_in->ref));
2274 BUG_ON(src_in->sourcetype != SOURCE_IN);
2275 cm->msg.connect.src_in = src_in;
2276 cm->length = 20;
2277 enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2280 void send_conndata(struct control_msg_out *cm, __u32 conn_id, __u64 seqno,
2281 char *data_orig, char *data, __u32 datalen,
2282 __u8 snd_delayed_lowbuf, __u16 max_delay_hz,
2283 struct conn_retrans *cr, int fromsplit)
2285 #warning todo list queue instead of tree
2286 cm->type = MSGTYPE_CONNDATA;
2287 cm->msg.conn_data.conn_id = conn_id;
2288 cm->msg.conn_data.seqno = seqno;
2289 cm->msg.conn_data.data_orig = data_orig;
2290 cm->msg.conn_data.data = data;
2291 cm->msg.conn_data.datalen = datalen;
2292 cm->msg.conn_data.snd_delayed_lowbuf = snd_delayed_lowbuf;
2293 cm->msg.conn_data.cr = cr;
2294 cm->msg.conn_data.max_delay_hz = max_delay_hz;
2295 cm->length = 13 + datalen;
2296 enqueue_control_msg(cm, (fromsplit ? ADDCMSG_SRC_SPLITCONNDATA :
2297 ADDCMSG_SRC_NEW));
2300 int send_reset_conn(struct neighbor *nb, __u32 conn_id, int lowprio)
2302 struct control_msg_out *cm;
2304 if (unlikely(get_neigh_state(nb) == NEIGHBOR_STATE_KILLED))
2305 return 0;
2307 cm = alloc_control_msg(nb, lowprio ?
2308 ACM_PRIORITY_LOW : ACM_PRIORITY_MED);
2310 if (unlikely(cm == 0))
2311 return 1;
2313 cm->type = MSGTYPE_RESET_CONN;
2314 cm->msg.reset_conn.conn_id = conn_id;
2315 cm->length = 5;
2317 enqueue_control_msg(cm, ADDCMSG_SRC_NEW);
2319 return 0;
2322 int __init cor_kgen_init(void)
2324 controlmsg_slab = kmem_cache_create("cor_controlmsg",
2325 sizeof(struct control_msg_out), 8, 0, 0);
2326 if (unlikely(controlmsg_slab == 0))
2327 return -ENOMEM;
2329 controlretrans_slab = kmem_cache_create("cor_controlretransmsg",
2330 sizeof(struct control_retrans), 8, 0, 0);
2331 if (unlikely(controlretrans_slab == 0))
2332 return -ENOMEM;
2334 return 0;
2337 MODULE_LICENSE("GPL");