2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <asm/byteorder.h>
25 /* not sent over the network - internal meaning only */
26 #define MSGTYPE_PONG 1
28 #define MSGTYPE_ACK_CONN 3
29 #define MSGTYPE_CONNECT 4
30 #define MSGTYPE_CONNECT_SUCCESS 5
31 #define MSGTYPE_RESET_CONN 6
32 #define MSGTYPE_CONNDATA 7
33 #define MSGTYPE_SET_MAX_CMSG_DELAY 8
35 #define MSGTYPE_PONG_TIMEENQUEUED 1
36 #define MSGTYPE_PONG_RESPDELAY 2
38 struct cor_control_msg_out
{
42 struct cor_neighbor
*nb
;
44 /* either queue or control_retrans_packet */
47 unsigned long time_added
;
55 ktime_t time_enqueued
;
63 struct cor_conn
*src_in
;
64 struct list_head conn_acks
;
82 struct cor_conn
*src_in
;
87 struct cor_conn
*src_in
;
92 __u8 in_pending_conn_resets
;
100 __u8 snd_delayed_lowbuf
;
105 struct cor_conn_retrans
*cr
;
116 struct cor_control_retrans
{
119 struct cor_neighbor
*nb
;
122 unsigned long timeout
;
124 struct list_head msgs
;
127 struct list_head timeout_list
;
131 static struct kmem_cache
*cor_controlmsg_slab
;
132 static struct kmem_cache
*cor_controlretrans_slab
;
134 static atomic_t cor_cmsg_othercnt
= ATOMIC_INIT(0);
136 #define ADDCMSG_SRC_NEW 1
137 #define ADDCMSG_SRC_SPLITCONNDATA 2
138 #define ADDCMSG_SRC_READD 3
139 #define ADDCMSG_SRC_RETRANS 4
141 static void cor_enqueue_control_msg(struct cor_control_msg_out
*msg
, int src
);
143 static void cor_try_merge_ackconns(struct cor_conn
*src_in_l
,
144 struct cor_control_msg_out
*cm
);
146 static void cor_merge_or_enqueue_ackconn(struct cor_conn
*src_in_l
,
147 struct cor_control_msg_out
*cm
, int src
);
149 static struct cor_control_msg_out
*_cor_alloc_control_msg(
150 struct cor_neighbor
*nb
)
152 struct cor_control_msg_out
*cm
;
156 cm
= kmem_cache_alloc(cor_controlmsg_slab
, GFP_ATOMIC
);
157 if (unlikely(cm
== 0))
159 memset(cm
, 0, sizeof(struct cor_control_msg_out
));
160 kref_init(&(cm
->ref
));
165 static int cor_calc_limit(int limit
, int priority
)
167 if (priority
== ACM_PRIORITY_LOW
)
169 else if (priority
== ACM_PRIORITY_MED
)
170 return (limit
* 3 + 1)/4;
171 else if (priority
== ACM_PRIORITY_HIGH
)
177 struct cor_control_msg_out
*cor_alloc_control_msg(struct cor_neighbor
*nb
,
180 struct cor_control_msg_out
*cm
= 0;
187 packets1
= atomic_inc_return(&(nb
->cmsg_othercnt
));
188 packets2
= atomic_inc_return(&(cor_cmsg_othercnt
));
190 BUG_ON(packets1
<= 0);
191 BUG_ON(packets2
<= 0);
193 if (packets1
<= cor_calc_limit(GUARANTEED_CMSGS_PER_NEIGH
, priority
))
196 if (unlikely(unlikely(packets1
> cor_calc_limit(MAX_CMSGS_PER_NEIGH
,
198 unlikely(packets2
> cor_calc_limit(MAX_CMSGS
,
203 cm
= _cor_alloc_control_msg(nb
);
204 if (unlikely(cm
== 0)) {
207 /* printk(KERN_ERR "cor_alloc_control_msg failed %ld %ld", packets1, packets2); */
208 atomic_dec(&(nb
->cmsg_othercnt
));
209 atomic_dec(&(cor_cmsg_othercnt
));
214 static void cor_cmsg_kref_free(struct kref
*ref
)
216 struct cor_control_msg_out
*cm
= container_of(ref
,
217 struct cor_control_msg_out
, ref
);
218 kmem_cache_free(cor_controlmsg_slab
, cm
);
221 void cor_free_control_msg(struct cor_control_msg_out
*cm
)
223 if (likely(cm
->type
!= MSGTYPE_PONG
)) {
224 atomic_dec(&(cm
->nb
->cmsg_othercnt
));
225 atomic_dec(&(cor_cmsg_othercnt
));
228 if (cm
->type
== MSGTYPE_ACK_CONN
) {
229 struct cor_conn
*trgt_out
= cm
->msg
.ack_conn
.src_in
->reversedir
;
230 BUG_ON(cm
->msg
.ack_conn
.src_in
== 0);
231 if ((cm
->msg
.ack_conn
.flags
& KP_ACK_CONN_FLAGS_PRIORITY
) != 0){
232 spin_lock_bh(&(trgt_out
->rcv_lock
));
233 BUG_ON(trgt_out
->targettype
!= TARGET_OUT
);
234 if (trgt_out
->target
.out
.priority_send_allowed
!= 0) {
235 trgt_out
->target
.out
.priority_send_allowed
= 1;
236 spin_unlock_bh(&(trgt_out
->rcv_lock
));
237 cor_refresh_conn_priority(trgt_out
, 0);
239 spin_unlock_bh(&(trgt_out
->rcv_lock
));
242 kref_put(&(cm
->msg
.ack_conn
.src_in
->ref
), cor_free_conn
);
243 cm
->msg
.ack_conn
.src_in
= 0;
244 } else if (cm
->type
== MSGTYPE_CONNECT
) {
245 BUG_ON(cm
->msg
.connect
.src_in
== 0);
246 kref_put(&(cm
->msg
.connect
.src_in
->ref
), cor_free_conn
);
247 cm
->msg
.connect
.src_in
= 0;
248 } else if (cm
->type
== MSGTYPE_CONNECT_SUCCESS
) {
249 BUG_ON(cm
->msg
.connect_success
.src_in
== 0);
250 kref_put(&(cm
->msg
.connect_success
.src_in
->ref
), cor_free_conn
);
251 cm
->msg
.connect_success
.src_in
= 0;
252 } else if (cm
->type
== MSGTYPE_RESET_CONN
) {
253 spin_lock_bh(&(cm
->nb
->cmsg_lock
));
254 if (cm
->msg
.reset_conn
.in_pending_conn_resets
!= 0) {
255 rb_erase(&(cm
->msg
.reset_conn
.rbn
),
256 &(cm
->nb
->pending_conn_resets_rb
));
257 cm
->msg
.reset_conn
.in_pending_conn_resets
= 0;
259 kref_put(&(cm
->ref
), cor_kreffree_bug
);
261 spin_unlock_bh(&(cm
->nb
->cmsg_lock
));
264 kref_put(&(cm
->ref
), cor_cmsg_kref_free
);
267 static void cor_free_control_retrans(struct kref
*ref
)
269 struct cor_control_retrans
*cr
= container_of(ref
,
270 struct cor_control_retrans
, ref
);
272 while (list_empty(&(cr
->msgs
)) == 0) {
273 struct cor_control_msg_out
*cm
= container_of(cr
->msgs
.next
,
274 struct cor_control_msg_out
, lh
);
276 if (cm
->type
== MSGTYPE_PONG
)
277 atomic_dec(&(cm
->nb
->cmsg_pongs_retrans_cnt
));
280 cor_free_control_msg(cm
);
283 kmem_cache_free(cor_controlretrans_slab
, cr
);
286 struct cor_control_retrans
*cor_get_control_retrans(
287 struct cor_neighbor
*nb_retranslocked
, __u64 seqno
)
289 struct rb_node
*n
= 0;
290 struct cor_control_retrans
*ret
= 0;
292 n
= nb_retranslocked
->kp_retransmits_rb
.rb_node
;
294 while (likely(n
!= 0) && ret
== 0) {
295 struct cor_control_retrans
*cr
= container_of(n
,
296 struct cor_control_retrans
, rbn
);
298 BUG_ON(cr
->nb
!= nb_retranslocked
);
300 if (cor_seqno_before(seqno
, cr
->seqno
))
302 else if (cor_seqno_after(seqno
, cr
->seqno
))
309 kref_get(&(ret
->ref
));
314 /* nb->retrans_lock must be held */
315 void cor_insert_control_retrans(struct cor_control_retrans
*ins
)
317 struct cor_neighbor
*nb
= ins
->nb
;
318 __u64 seqno
= ins
->seqno
;
320 struct rb_root
*root
;
322 struct rb_node
*parent
= 0;
326 root
= &(nb
->kp_retransmits_rb
);
327 p
= &(root
->rb_node
);
330 struct cor_control_retrans
*cr
= container_of(*p
,
331 struct cor_control_retrans
, rbn
);
333 BUG_ON(cr
->nb
!= nb
);
336 if (unlikely(cor_seqno_eq(seqno
, cr
->seqno
))) {
338 } else if (cor_seqno_before(seqno
, cr
->seqno
)) {
340 } else if (cor_seqno_after(seqno
, cr
->seqno
)) {
347 kref_get(&(ins
->ref
));
348 rb_link_node(&(ins
->rbn
), parent
, p
);
349 rb_insert_color(&(ins
->rbn
), root
);
352 static void cor_remove_connack_oooflag_ifold(struct cor_conn
*src_in_l
,
353 struct cor_control_msg_out
*cm
)
355 if (cor_ooolen(cm
->msg
.ack_conn
.flags
) != 0 && cor_seqno_before_eq(
356 cm
->msg
.ack_conn
.seqno_ooo
+
357 cm
->msg
.ack_conn
.length
,
358 src_in_l
->source
.in
.next_seqno
)) {
359 cm
->msg
.ack_conn
.length
= 0;
360 cm
->msg
.ack_conn
.flags
= (cm
->msg
.ack_conn
.flags
&
361 (~KP_ACK_CONN_FLAGS_OOO
));
365 static int cor_ackconn_prepare_requeue(struct cor_conn
*cn_l
,
366 struct cor_control_msg_out
*cm
)
368 if (unlikely(unlikely(cn_l
->sourcetype
!= SOURCE_IN
) ||
369 unlikely(cn_l
->source
.in
.nb
!= cm
->nb
) ||
370 unlikely(cn_l
->reversedir
->target
.out
.conn_id
!=
371 cm
->msg
.ack_conn
.conn_id
) ||
372 unlikely(cn_l
->isreset
!= 0)))
375 cor_remove_connack_oooflag_ifold(cn_l
, cm
);
377 if (!cor_seqno_eq(cm
->msg
.ack_conn
.ack_seqno
,
378 cn_l
->source
.in
.ack_seqno
))
379 cm
->msg
.ack_conn
.flags
= (cm
->msg
.ack_conn
.flags
&
380 (~KP_ACK_CONN_FLAGS_SEQNO
) &
381 (~KP_ACK_CONN_FLAGS_WINDOW
));
383 if (cm
->msg
.ack_conn
.flags
== 0)
386 cm
->length
= 6 + cor_ack_conn_len(cm
->msg
.ack_conn
.flags
);
391 static void cor_requeue_control_retrans(struct cor_control_retrans
*cr
)
393 atomic_inc(&(cr
->nb
->cmsg_bulk_readds
));
395 while (list_empty(&(cr
->msgs
)) == 0) {
396 struct cor_control_msg_out
*cm
= container_of(cr
->msgs
.prev
,
397 struct cor_control_msg_out
, lh
);
400 BUG_ON(cm
->nb
!= cr
->nb
);
402 if (cm
->type
== MSGTYPE_ACK_CONN
) {
403 struct cor_conn
*cn_l
= cm
->msg
.ack_conn
.src_in
;
404 spin_lock_bh(&(cn_l
->rcv_lock
));
405 if (unlikely(cor_ackconn_prepare_requeue(cn_l
,
407 cor_free_control_msg(cm
);
409 cor_merge_or_enqueue_ackconn(cn_l
, cm
,
410 ADDCMSG_SRC_RETRANS
);
413 spin_unlock_bh(&(cn_l
->rcv_lock
));
415 if (cm
->type
== MSGTYPE_PONG
)
416 atomic_dec(&(cm
->nb
->cmsg_pongs_retrans_cnt
));
417 cor_enqueue_control_msg(cm
, ADDCMSG_SRC_RETRANS
);
421 atomic_dec(&(cr
->nb
->cmsg_bulk_readds
));
423 spin_lock_bh(&(cr
->nb
->cmsg_lock
));
424 cor_schedule_controlmsg_timer(cr
->nb
);
425 spin_unlock_bh(&(cr
->nb
->cmsg_lock
));
428 static void cor_empty_retrans_queue(struct cor_neighbor
*nb_retranslocked
)
430 while (!list_empty(&(nb_retranslocked
->retrans_list
))) {
431 struct cor_control_retrans
*cr
= container_of(
432 nb_retranslocked
->retrans_list
.next
,
433 struct cor_control_retrans
, timeout_list
);
435 BUG_ON(cr
->nb
!= nb_retranslocked
);
437 list_del(&(cr
->timeout_list
));
438 rb_erase(&(cr
->rbn
), &(nb_retranslocked
->kp_retransmits_rb
));
440 kref_put(&(cr
->ref
), cor_kreffree_bug
); /* rb */
441 kref_put(&(cr
->ref
), cor_free_control_retrans
); /* list */
445 void cor_retransmit_timerfunc(struct timer_list
*retrans_timer
)
447 struct cor_neighbor
*nb
= container_of(retrans_timer
,
448 struct cor_neighbor
, retrans_timer
);
449 int nbstate
= cor_get_neigh_state(nb
);
450 struct cor_control_retrans
*cr
= 0;
452 spin_lock_bh(&(nb
->retrans_lock
));
454 if (list_empty(&(nb
->retrans_list
))) {
455 spin_unlock_bh(&(nb
->retrans_lock
));
456 kref_put(&(nb
->ref
), cor_neighbor_free
);
460 if (unlikely(nbstate
== NEIGHBOR_STATE_KILLED
)) {
461 cor_empty_retrans_queue(nb
);
462 spin_unlock_bh(&(nb
->retrans_lock
));
463 kref_put(&(nb
->ref
), cor_neighbor_free
);
467 cr
= container_of(nb
->retrans_list
.next
, struct cor_control_retrans
,
470 BUG_ON(cr
->nb
!= nb
);
472 if (time_after(cr
->timeout
, jiffies
)) {
473 int rc
= mod_timer(&(nb
->retrans_timer
), cr
->timeout
);
474 spin_unlock_bh(&(nb
->retrans_lock
));
476 kref_put(&(nb
->ref
), cor_neighbor_free
);
480 spin_unlock_bh(&(nb
->retrans_lock
));
482 spin_lock_bh(&(nb
->cmsg_lock
));
483 nb
->add_retrans_needed
= 1;
484 cor_schedule_controlmsg_timer(nb
);
485 spin_unlock_bh(&(nb
->cmsg_lock
));
487 kref_put(&(nb
->ref
), cor_neighbor_free
);
490 static void cor_schedule_retransmit(struct cor_control_retrans
*cr
,
491 struct cor_neighbor
*nb
)
495 cr
->timeout
= cor_calc_timeout(atomic_read(&(nb
->latency_retrans_us
)),
496 atomic_read(&(nb
->latency_stddev_retrans_us
)),
497 atomic_read(&(nb
->max_remote_ack_delay_us
)));
499 spin_lock_bh(&(nb
->retrans_lock
));
500 cor_insert_control_retrans(cr
);
501 first
= list_empty(&(nb
->retrans_list
));
502 list_add_tail(&(cr
->timeout_list
), &(nb
->retrans_list
));
505 if (mod_timer(&(nb
->retrans_timer
), cr
->timeout
) == 0) {
506 kref_get(&(nb
->ref
));
510 spin_unlock_bh(&(nb
->retrans_lock
));
513 void cor_kern_ack_rcvd(struct cor_neighbor
*nb
, __u64 seqno
)
515 struct cor_control_retrans
*cr
= 0;
517 spin_lock_bh(&(nb
->retrans_lock
));
519 cr
= cor_get_control_retrans(nb
, seqno
);
522 /* char *seqno_p = (char *) &seqno;
523 seqno = cpu_to_be32(seqno);
524 printk(KERN_ERR "bogus/duplicate ack received %d %d %d %d",
525 seqno_p[0], seqno_p[1], seqno_p[2], seqno_p[3]);
530 rb_erase(&(cr
->rbn
), &(nb
->kp_retransmits_rb
));
532 BUG_ON(cr
->nb
!= nb
);
534 list_del(&(cr
->timeout_list
));
537 spin_unlock_bh(&(nb
->retrans_lock
));
540 /* cor_get_control_retrans */
541 kref_put(&(cr
->ref
), cor_kreffree_bug
);
543 kref_put(&(cr
->ref
), cor_kreffree_bug
); /* rb_erase */
544 kref_put(&(cr
->ref
), cor_free_control_retrans
); /* list */
548 static __u8
cor_get_window(struct cor_conn
*cn
,
549 struct cor_neighbor
*expectedsender
, __u32 expected_connid
)
553 spin_lock_bh(&(cn
->rcv_lock
));
555 if (unlikely(unlikely(cn
->sourcetype
!= SOURCE_IN
) ||
556 unlikely(expectedsender
!= 0 && (cn
->source
.in
.nb
!=
557 expectedsender
|| cn
->reversedir
->target
.out
.conn_id
!=
561 window
= cor_enc_log_64_7(cor_seqno_clean(
562 cn
->source
.in
.window_seqnolimit
-
563 cn
->source
.in
.next_seqno
));
565 cn
->source
.in
.window_seqnolimit_remote
= cn
->source
.in
.next_seqno
+
566 cor_dec_log_64_7(window
);
569 spin_unlock_bh(&(cn
->rcv_lock
));
574 /* static void padding(struct sk_buff *skb, __u32 length)
579 dst = skb_put(skb, length);
581 memset(dst, KP_PADDING, length);
585 static __u32
cor_add_init_session(struct sk_buff
*skb
, __be32 sessionid
,
590 BUG_ON(KP_INIT_SESSION_CMDLEN
!= 5);
592 if (unlikely(spaceleft
< 5))
595 dst
= skb_put(skb
, 5);
598 dst
[0] = KP_INIT_SESSION
;
599 cor_put_be32(dst
+ 1, sessionid
);
604 static __u32
cor_add_ack(struct sk_buff
*skb
, struct cor_control_retrans
*cr
,
605 struct cor_control_msg_out
*cm
, __u32 spaceleft
)
609 BUG_ON(cm
->length
!= 7);
611 if (unlikely(spaceleft
< 7))
614 dst
= skb_put(skb
, 7);
618 cor_put_u48(dst
+ 1, cm
->msg
.ack
.seqno
);
620 cor_free_control_msg(cm
);
625 static __u32
cor_add_ack_conn(struct sk_buff
*skb
,
626 struct cor_control_retrans
*cr
, struct cor_control_msg_out
*cm
,
632 if (unlikely(spaceleft
< cm
->length
))
635 dst
= skb_put(skb
, cm
->length
);
638 dst
[offset
] = KP_ACK_CONN
;
640 cor_put_u32(dst
+ offset
, cm
->msg
.ack_conn
.conn_id
);
642 dst
[offset
] = cm
->msg
.ack_conn
.flags
;
645 if ((cm
->msg
.ack_conn
.flags
& KP_ACK_CONN_FLAGS_SEQNO
) != 0) {
646 cor_put_u48(dst
+ offset
, cm
->msg
.ack_conn
.seqno
);
649 if ((cm
->msg
.ack_conn
.flags
& KP_ACK_CONN_FLAGS_WINDOW
) != 0) {
650 BUG_ON(cm
->msg
.ack_conn
.src_in
== 0);
651 dst
[offset
] = cor_get_window(cm
->msg
.ack_conn
.src_in
,
652 cm
->nb
, cm
->msg
.ack_conn
.conn_id
);
657 if (cor_ooolen(cm
->msg
.ack_conn
.flags
) != 0) {
658 cor_put_u48(dst
+ offset
, cm
->msg
.ack_conn
.seqno_ooo
);
660 if (cor_ooolen(cm
->msg
.ack_conn
.flags
) == 1) {
661 BUG_ON(cm
->msg
.ack_conn
.length
> 255);
662 dst
[offset
] = cm
->msg
.ack_conn
.length
;
664 } else if (cor_ooolen(cm
->msg
.ack_conn
.flags
) == 2) {
665 BUG_ON(cm
->msg
.ack_conn
.length
<= 255);
666 BUG_ON(cm
->msg
.ack_conn
.length
> 65535);
667 cor_put_u16(dst
+ offset
, cm
->msg
.ack_conn
.length
);
669 } else if (cor_ooolen(cm
->msg
.ack_conn
.flags
) == 4) {
670 BUG_ON(cm
->msg
.ack_conn
.length
<= 65535);
671 cor_put_u32(dst
+ offset
, cm
->msg
.ack_conn
.length
);
678 if ((cm
->msg
.ack_conn
.flags
& KP_ACK_CONN_FLAGS_PRIORITY
) != 0) {
679 dst
[offset
] = cm
->msg
.ack_conn
.priority_seqno
;
681 dst
[offset
] = cm
->msg
.ack_conn
.priority
;
685 list_add_tail(&(cm
->lh
), &(cr
->msgs
));
687 BUG_ON(offset
!= cm
->length
);
691 static __u32
cor_add_ping(struct sk_buff
*skb
, __u32 cookie
, __u32 spaceleft
)
695 BUG_ON(KP_PING_CMDLEN
!= 5);
697 if (unlikely(spaceleft
< 5))
700 dst
= skb_put(skb
, 5);
704 cor_put_u32(dst
+ 1, cookie
);
709 static __u32
cor_calc_respdelay(ktime_t time_pong_enqueued
, ktime_t time_end
)
711 if (unlikely(ktime_before(time_end
, time_pong_enqueued
))) {
714 __s64 respdelay
= div_u64(ktime_to_ns(time_end
) -
715 ktime_to_ns(time_pong_enqueued
) + 500,
718 if (unlikely(respdelay
> U32_MAX
))
720 else if (unlikely(respdelay
< 0))
723 return (__u32
) respdelay
;
727 static __u32
cor_add_pong(struct sk_buff
*skb
, struct cor_control_retrans
*cr
,
728 struct cor_control_msg_out
*cm
, __u32 spaceleft
,
729 ktime_t cmsg_send_start
)
731 __u32 respdelay_full
;
732 __u32 respdelay_netonly
;
735 BUG_ON(cm
->length
!= 13);
737 if (unlikely(spaceleft
< 13))
740 respdelay_full
= cor_calc_respdelay(cm
->msg
.pong
.time_enqueued
,
742 respdelay_netonly
= cor_calc_respdelay(cm
->msg
.pong
.ping_rcvtime
,
745 dst
= skb_put(skb
, 13);
749 cor_put_u32(dst
+ 1, cm
->msg
.pong
.cookie
);
750 cor_put_u32(dst
+ 5, (__u32
) respdelay_full
);
751 cor_put_u32(dst
+ 9, (__u32
) respdelay_netonly
);
753 list_add_tail(&(cm
->lh
), &(cr
->msgs
));
758 static __u32
cor_add_connect(struct sk_buff
*skb
,
759 struct cor_control_retrans
*cr
, struct cor_control_msg_out
*cm
,
763 struct cor_conn
*src_in
= cm
->msg
.connect
.src_in
;
765 BUG_ON(cm
->length
!= 21);
767 if (unlikely(spaceleft
< 21))
770 dst
= skb_put(skb
, 21);
774 cor_put_u32(dst
+ 1, cm
->msg
.connect
.conn_id
);
775 cor_put_u48(dst
+ 5, cm
->msg
.connect
.seqno1
);
776 cor_put_u48(dst
+ 11, cm
->msg
.connect
.seqno2
);
777 BUG_ON(cm
->msg
.connect
.src_in
== 0);
778 dst
[17] = cor_get_window(cm
->msg
.connect
.src_in
, cm
->nb
,
779 cm
->msg
.connect
.conn_id
);
781 spin_lock_bh(&(src_in
->reversedir
->rcv_lock
));
782 BUG_ON(src_in
->reversedir
->targettype
!= TARGET_OUT
);
784 dst
[18] = src_in
->reversedir
->target
.out
.priority_seqno
;
785 dst
[19] = src_in
->reversedir
->target
.out
.priority_last
;
786 if (src_in
->is_highlatency
== 0)
791 spin_unlock_bh(&(src_in
->reversedir
->rcv_lock
));
793 list_add_tail(&(cm
->lh
), &(cr
->msgs
));
798 static __u32
cor_add_connect_success(struct sk_buff
*skb
,
799 struct cor_control_retrans
*cr
, struct cor_control_msg_out
*cm
,
804 BUG_ON(cm
->length
!= 6);
806 if (unlikely(spaceleft
< 6))
809 dst
= skb_put(skb
, 6);
812 dst
[0] = KP_CONNECT_SUCCESS
;
813 cor_put_u32(dst
+ 1, cm
->msg
.connect_success
.conn_id
);
814 BUG_ON(cm
->msg
.connect_success
.src_in
== 0);
815 dst
[5] = cor_get_window(cm
->msg
.connect_success
.src_in
, cm
->nb
,
816 cm
->msg
.connect_success
.conn_id
);
818 list_add_tail(&(cm
->lh
), &(cr
->msgs
));
823 static __u32
cor_add_reset_conn(struct sk_buff
*skb
,
824 struct cor_control_retrans
*cr
, struct cor_control_msg_out
*cm
,
829 BUG_ON(cm
->length
!= 5);
831 if (unlikely(spaceleft
< 5))
834 dst
= skb_put(skb
, 5);
837 dst
[0] = KP_RESET_CONN
;
838 cor_put_u32(dst
+ 1, cm
->msg
.reset_conn
.conn_id
);
840 list_add_tail(&(cm
->lh
), &(cr
->msgs
));
845 static __u32
cor_add_conndata(struct sk_buff
*skb
,
846 struct cor_control_retrans
*cr
, struct cor_control_msg_out
*cm
,
847 __u32 spaceleft
, struct cor_control_msg_out
**split_conndata
,
852 __u32 totallen
= cm
->msg
.conn_data
.datalen
+ KP_CONN_DATA_CMDLEN
;
853 __u32 putlen
= min(totallen
, spaceleft
);
854 __u32 dataputlen
= putlen
- KP_CONN_DATA_CMDLEN
;
856 BUG_ON(KP_CONN_DATA_CMDLEN
!= 13);
857 BUG_ON(cm
->length
!= totallen
);
859 BUG_ON(putlen
> 1024*1024*1024);
861 BUG_ON(split_conndata
== 0);
862 BUG_ON(*split_conndata
!= 0);
863 BUG_ON(sc_sendlen
== 0);
864 BUG_ON(*sc_sendlen
!= 0);
866 if (putlen
< KP_CONN_DATA_CMDLEN
+ 1)
869 dst
= skb_put(skb
, putlen
);
872 if (cm
->msg
.conn_data
.flush
!= 0) {
873 if (cm
->msg
.conn_data
.snd_delayed_lowbuf
== 0) {
874 dst
[0] = KP_CONN_DATA_FLUSH
;
876 dst
[0] = KP_CONN_DATA_LOWBUFDELAYED_FLUSH
;
879 if (cm
->msg
.conn_data
.snd_delayed_lowbuf
== 0) {
880 dst
[0] = KP_CONN_DATA
;
882 dst
[0] = KP_CONN_DATA_LOWBUFDELAYED
;
885 cor_put_u32(dst
+ 1, cm
->msg
.conn_data
.conn_id
);
886 cor_put_u48(dst
+ 5, cm
->msg
.conn_data
.seqno
);
887 cor_put_u16(dst
+ 11, dataputlen
);
889 memcpy(dst
+ 13, cm
->msg
.conn_data
.data
, dataputlen
);
891 if (cm
->msg
.conn_data
.datalen
== dataputlen
) {
892 BUG_ON(cm
->length
!= putlen
);
893 list_add_tail(&(cm
->lh
), &(cr
->msgs
));
895 *split_conndata
= cm
;
896 *sc_sendlen
= dataputlen
;
902 static __u32
cor_add_set_max_cmsg_dly(struct sk_buff
*skb
,
903 struct cor_control_retrans
*cr
, struct cor_control_msg_out
*cm
,
908 BUG_ON(KP_SET_MAX_CMSG_DELAY_CMDLEN
!= 13);
909 BUG_ON(cm
->length
!= KP_SET_MAX_CMSG_DELAY_CMDLEN
);
911 if (unlikely(spaceleft
< 13))
914 dst
= skb_put(skb
, 13);
917 dst
[0] = KP_SET_MAX_CMSG_DELAY
;
918 cor_put_u32(dst
+ 1, cm
->msg
.set_max_cmsg_delay
.ack_delay
);
919 cor_put_u32(dst
+ 5, cm
->msg
.set_max_cmsg_delay
.ackconn_delay
);
920 cor_put_u32(dst
+ 9, cm
->msg
.set_max_cmsg_delay
.other_delay
);
922 list_add_tail(&(cm
->lh
), &(cr
->msgs
));
927 static __u32
cor_add_message(struct sk_buff
*skb
,
928 struct cor_control_retrans
*cr
, struct cor_control_msg_out
*cm
,
929 __u32 spaceleft
, ktime_t cmsg_send_start
,
930 struct cor_control_msg_out
**split_conndata
, __u32
*sc_sendlen
)
932 BUG_ON(split_conndata
!= 0 && *split_conndata
!= 0);
933 BUG_ON(sc_sendlen
!= 0 && *sc_sendlen
!= 0);
937 return cor_add_ack(skb
, cr
, cm
, spaceleft
);
938 case MSGTYPE_ACK_CONN
:
939 return cor_add_ack_conn(skb
, cr
, cm
, spaceleft
);
941 return cor_add_pong(skb
, cr
, cm
, spaceleft
, cmsg_send_start
);
942 case MSGTYPE_CONNECT
:
943 return cor_add_connect(skb
, cr
, cm
, spaceleft
);
944 case MSGTYPE_CONNECT_SUCCESS
:
945 return cor_add_connect_success(skb
, cr
, cm
, spaceleft
);
946 case MSGTYPE_RESET_CONN
:
947 return cor_add_reset_conn(skb
, cr
, cm
, spaceleft
);
948 case MSGTYPE_CONNDATA
:
949 return cor_add_conndata(skb
, cr
, cm
, spaceleft
, split_conndata
,
951 case MSGTYPE_SET_MAX_CMSG_DELAY
:
952 return cor_add_set_max_cmsg_dly(skb
, cr
, cm
, spaceleft
);
960 static __u32
__cor_send_messages(struct cor_neighbor
*nb
, struct sk_buff
*skb
,
961 struct cor_control_retrans
*cr
, struct list_head
*cmsgs
,
962 __u32 spaceleft
, int nbstate
, ktime_t cmsg_send_start
,
963 struct cor_control_msg_out
**split_conndata
, __u32
*sc_sendlen
)
966 while (!list_empty(cmsgs
)) {
968 struct cor_control_msg_out
*cm
= container_of(cmsgs
->next
,
969 struct cor_control_msg_out
, lh
);
973 rc
= cor_add_message(skb
, cr
, cm
, spaceleft
- length
,
974 cmsg_send_start
, split_conndata
, sc_sendlen
);
977 list_add(&(cm
->lh
), cmsgs
);
980 BUG_ON(rc
!= cm
->length
&& cm
->type
!= MSGTYPE_CONNDATA
);
988 static __u32
__cor_send_messages_smcd(struct cor_neighbor
*nb
,
989 struct sk_buff
*skb
, struct cor_control_retrans
*cr
,
990 __u32 spaceleft
, ktime_t cmsg_send_start
)
992 struct cor_control_msg_out
*cm
;
995 cm
= cor_alloc_control_msg(nb
, ACM_PRIORITY_MED
);
997 if (unlikely(cm
== 0))
1000 cm
->type
= MSGTYPE_SET_MAX_CMSG_DELAY
;
1001 cm
->msg
.set_max_cmsg_delay
.ack_delay
=
1002 CMSG_MAXDELAY_ACK_MS
* 1000;
1003 cm
->msg
.set_max_cmsg_delay
.ackconn_delay
=
1004 CMSG_MAXDELAY_ACKCONN_MS
* 1000;
1005 cm
->msg
.set_max_cmsg_delay
.other_delay
=
1006 CMSG_MAXDELAY_OTHER_MS
* 1000;
1007 cm
->length
= KP_SET_MAX_CMSG_DELAY_CMDLEN
;
1009 rc
= cor_add_message(skb
, cr
, cm
, spaceleft
, cmsg_send_start
, 0, 0);
1011 nb
->max_cmsg_delay_sent
= 1;
1016 static void cor_requeue_message(struct cor_control_msg_out
*cm
)
1018 if (cm
->type
== MSGTYPE_ACK_CONN
) {
1019 struct cor_conn
*cn_l
= cm
->msg
.ack_conn
.src_in
;
1021 spin_lock_bh(&(cn_l
->rcv_lock
));
1022 if (unlikely(cor_ackconn_prepare_requeue(cn_l
, cm
) == 0)) {
1023 cor_free_control_msg(cm
);
1025 spin_lock_bh(&(cm
->nb
->cmsg_lock
));
1027 list_add(&(cm
->lh
), &(cm
->nb
->cmsg_queue_ackconn
));
1028 cm
->nb
->cmsg_otherlength
+= cm
->length
;
1030 list_add(&(cm
->msg
.ack_conn
.conn_acks
),
1031 &(cn_l
->source
.in
.acks_pending
));
1032 cor_try_merge_ackconns(cn_l
, cm
);
1034 spin_unlock_bh(&(cm
->nb
->cmsg_lock
));
1036 spin_unlock_bh(&(cn_l
->rcv_lock
));
1040 cor_enqueue_control_msg(cm
, ADDCMSG_SRC_READD
);
1043 static void cor_requeue_messages(struct list_head
*lh
)
1045 while (list_empty(lh
) == 0) {
1046 struct cor_control_msg_out
*cm
= container_of(lh
->prev
,
1047 struct cor_control_msg_out
, lh
);
1048 list_del(&(cm
->lh
));
1049 cor_requeue_message(cm
);
1053 static int _cor_send_messages_send2(struct cor_neighbor
*nb
,
1054 struct sk_buff
*skb
, int ping
, int initsession
,
1055 struct cor_control_retrans
*cr
, struct list_head
*cmsgs
,
1056 __u32 spaceleft
, int nbstate
, ktime_t cmsg_send_start
,
1062 __u32 pingcookie
= 0;
1063 unsigned long last_ping_time
;
1064 struct cor_control_msg_out
*split_conndata
= 0;
1065 __u32 sc_sendlen
= 0;
1067 if (ping
!= TIMETOSENDPING_NO
) {
1070 if (unlikely(initsession
)) {
1071 rc
= cor_add_init_session(skb
, nb
->sessionid
,
1072 spaceleft
- length
);
1078 pingcookie
= cor_add_ping_req(nb
, &last_ping_time
);
1079 rc
= cor_add_ping(skb
, pingcookie
, spaceleft
- length
);
1085 if (likely(nbstate
== NEIGHBOR_STATE_ACTIVE
) &&
1086 unlikely(nb
->max_cmsg_delay_sent
== 0))
1087 length
+= __cor_send_messages_smcd(nb
, skb
, cr
,
1088 spaceleft
- length
, cmsg_send_start
);
1090 length
+= __cor_send_messages(nb
, skb
, cr
, cmsgs
, spaceleft
- length
,
1091 nbstate
, cmsg_send_start
, &split_conndata
, &sc_sendlen
);
1093 BUG_ON(length
> spaceleft
);
1095 if (likely(ping
!= TIMETOSENDPING_FORCE
) &&
1096 pinglen
!= 0 && unlikely(length
== pinglen
)) {
1097 cor_unadd_ping_req(nb
, pingcookie
, last_ping_time
, 0);
1101 if (unlikely(length
== 0)) {
1105 BUG_ON(list_empty(&(cr
->msgs
)) == 0);
1106 kref_put(&(cr
->ref
), cor_free_control_retrans
);
1108 nb
->kpacket_seqno
--;
1109 return QOS_RESUME_DONE
;
1112 //padding(skb, spaceleft - length);
1113 BUG_ON(spaceleft
- length
!= 0);
1115 rc
= cor_dev_queue_xmit(skb
, nb
->queue
, QOS_CALLER_KPACKET
);
1116 if (rc
== NET_XMIT_SUCCESS
)
1119 if (rc
== NET_XMIT_DROP
) {
1121 cor_unadd_ping_req(nb
, pingcookie
, last_ping_time
, 1);
1123 atomic_inc(&(nb
->cmsg_bulk_readds
));
1124 if (split_conndata
!= 0)
1125 cor_requeue_message(split_conndata
);
1127 cor_requeue_messages(&(cr
->msgs
));
1129 kref_put(&(cr
->ref
), cor_free_control_retrans
);
1131 atomic_dec(&(nb
->cmsg_bulk_readds
));
1133 spin_lock_bh(&(nb
->cmsg_lock
));
1134 cor_schedule_controlmsg_timer(nb
);
1135 spin_unlock_bh(&(nb
->cmsg_lock
));
1137 struct list_head
*curr
= cr
->msgs
.next
;
1139 if (pingcookie
!= 0)
1140 cor_ping_sent(nb
, pingcookie
);
1142 while (curr
!= &(cr
->msgs
)) {
1143 struct cor_control_msg_out
*cm
= container_of(curr
,
1144 struct cor_control_msg_out
, lh
);
1148 if (unlikely(cm
->type
== MSGTYPE_PONG
&&
1149 (nbstate
!= NEIGHBOR_STATE_ACTIVE
))) {
1150 list_del(&(cm
->lh
));
1151 cor_free_control_msg(cm
);
1152 } else if (unlikely(cm
->type
== MSGTYPE_PONG
&&
1154 &(nb
->cmsg_pongs_retrans_cnt
)) >
1155 MAX_PONG_CMSGS_RETRANS_PER_NEIGH
)) {
1156 atomic_dec(&(nb
->cmsg_pongs_retrans_cnt
));
1157 list_del(&(cm
->lh
));
1158 cor_free_control_msg(cm
);
1159 } else if (cm
->type
== MSGTYPE_CONNDATA
) {
1160 cor_schedule_retransmit_conn(
1161 cm
->msg
.conn_data
.cr
, 0, 0);
1162 kfree(cm
->msg
.conn_data
.data_orig
);
1163 list_del(&(cm
->lh
));
1164 cor_free_control_msg(cm
);
1168 if (split_conndata
!= 0) {
1169 BUG_ON(sc_sendlen
== 0);
1170 BUG_ON(sc_sendlen
>=
1171 split_conndata
->msg
.conn_data
.datalen
);
1173 split_conndata
->msg
.conn_data
.seqno
+= sc_sendlen
;
1174 split_conndata
->msg
.conn_data
.data
+= sc_sendlen
;
1175 split_conndata
->msg
.conn_data
.datalen
-= sc_sendlen
;
1176 split_conndata
->length
= KP_CONN_DATA_CMDLEN
+
1177 split_conndata
->msg
.conn_data
.datalen
;
1178 cor_enqueue_control_msg(split_conndata
,
1179 ADDCMSG_SRC_SPLITCONNDATA
);
1183 if (list_empty(&(cr
->msgs
)))
1184 kref_put(&(cr
->ref
), cor_free_control_retrans
);
1186 cor_schedule_retransmit(cr
, nb
);
1189 return (rc
== NET_XMIT_SUCCESS
) ? QOS_RESUME_DONE
: QOS_RESUME_CONG
;
1192 static int _cor_send_messages_send(struct cor_neighbor
*nb
, int ping
,
1193 int initsession
, struct list_head
*cmsgs
, int nbstate
,
1194 __u32 length
, __u64 seqno
, ktime_t cmsg_send_start
, int *sent
)
1196 struct sk_buff
*skb
;
1197 struct cor_control_retrans
*cr
;
1200 skb
= cor_create_packet_cmsg(nb
, length
, GFP_ATOMIC
, seqno
);
1201 if (unlikely(skb
== 0)) {
1202 printk(KERN_ERR
"cor_send_messages(): cannot allocate skb (out of memory?)");
1204 cor_requeue_messages(cmsgs
);
1205 return QOS_RESUME_CONG
;
1208 cr
= kmem_cache_alloc(cor_controlretrans_slab
, GFP_ATOMIC
);
1209 if (unlikely(cr
== 0)) {
1210 printk(KERN_ERR
"cor_send_messages(): cannot allocate cor_control_retrans (out of memory?)");
1213 cor_requeue_messages(cmsgs
);
1214 return QOS_RESUME_CONG
;
1217 memset(cr
, 0, sizeof(struct cor_control_retrans
));
1218 kref_init(&(cr
->ref
));
1221 INIT_LIST_HEAD(&(cr
->msgs
));
1223 rc
= _cor_send_messages_send2(nb
, skb
, ping
, initsession
, cr
, cmsgs
,
1224 length
, nbstate
, cmsg_send_start
, sent
);
1226 BUG_ON(!list_empty(cmsgs
));
1231 #define CMSGQUEUE_PONG 1
1232 #define CMSGQUEUE_ACK 2
1233 #define CMSGQUEUE_ACK_CONN 3
1234 #define CMSGQUEUE_CONNDATA_LOWLAT 4
1235 #define CMSGQUEUE_CONNDATA_HIGHLAT 5
1236 #define CMSGQUEUE_OTHER 6
1238 static unsigned long cor_get_cmsg_timeout(struct cor_control_msg_out
*cm
,
1241 if (cm
->type
== MSGTYPE_ACK
) {
1242 BUG_ON(queue
!= CMSGQUEUE_ACK
);
1243 return cm
->time_added
+
1244 msecs_to_jiffies(CMSG_MAXDELAY_ACK_MS
) - 1;
1245 } else if (cm
->type
== MSGTYPE_ACK_CONN
) {
1246 BUG_ON(queue
!= CMSGQUEUE_ACK_CONN
);
1247 return cm
->time_added
+
1248 msecs_to_jiffies(CMSG_MAXDELAY_ACKCONN_MS
) - 1;
1249 } else if (cm
->type
== MSGTYPE_CONNDATA
) {
1250 if (cm
->msg
.conn_data
.highlatency
!= 0) {
1251 BUG_ON(queue
!= CMSGQUEUE_CONNDATA_HIGHLAT
);
1252 return cm
->time_added
+
1254 CMSG_MAXDELAY_CONNDATA_MS
) - 1;
1256 BUG_ON(queue
!= CMSGQUEUE_CONNDATA_LOWLAT
);
1257 return cm
->time_added
;
1260 BUG_ON(cm
->type
== MSGTYPE_PONG
&& queue
!= CMSGQUEUE_PONG
);
1261 BUG_ON(cm
->type
!= MSGTYPE_PONG
&& queue
!= CMSGQUEUE_OTHER
);
1263 return cm
->time_added
+
1264 msecs_to_jiffies(CMSG_MAXDELAY_OTHER_MS
) - 1;
1268 static void _cor_peek_message(struct cor_neighbor
*nb_cmsglocked
, int queue
,
1269 struct cor_control_msg_out
**currcm
, unsigned long *currtimeout
,
1272 struct cor_control_msg_out
*cm
;
1273 unsigned long cmtimeout
;
1275 struct list_head
*queuelh
;
1276 if (queue
== CMSGQUEUE_PONG
) {
1277 queuelh
= &(nb_cmsglocked
->cmsg_queue_pong
);
1278 } else if (queue
== CMSGQUEUE_ACK
) {
1279 queuelh
= &(nb_cmsglocked
->cmsg_queue_ack
);
1280 } else if (queue
== CMSGQUEUE_ACK_CONN
) {
1281 queuelh
= &(nb_cmsglocked
->cmsg_queue_ackconn
);
1282 } else if (queue
== CMSGQUEUE_CONNDATA_LOWLAT
) {
1283 queuelh
= &(nb_cmsglocked
->cmsg_queue_conndata_lowlat
);
1284 } else if (queue
== CMSGQUEUE_CONNDATA_HIGHLAT
) {
1285 queuelh
= &(nb_cmsglocked
->cmsg_queue_conndata_highlat
);
1286 } else if (queue
== CMSGQUEUE_OTHER
) {
1287 queuelh
= &(nb_cmsglocked
->cmsg_queue_other
);
1292 if (list_empty(queuelh
))
1295 cm
= container_of(queuelh
->next
, struct cor_control_msg_out
, lh
);
1296 cmtimeout
= cor_get_cmsg_timeout(cm
, queue
);
1298 BUG_ON(cm
->nb
!= nb_cmsglocked
);
1300 if (*currcm
== 0 || (time_before(cmtimeout
, *currtimeout
) &&
1301 time_before(jiffies
, *currtimeout
))) {
1303 *currtimeout
= cmtimeout
;
1305 if (queue
== CMSGQUEUE_PONG
) {
1306 *currlen
= &(nb_cmsglocked
->cmsg_pongslength
);
1308 *currlen
= &(nb_cmsglocked
->cmsg_otherlength
);
1313 static void cor_peek_message(struct cor_neighbor
*nb_cmsglocked
, int nbstate
,
1314 struct cor_control_msg_out
**cm
, unsigned long *cmtimeout
,
1315 __u32
**len
, int for_timeout
)
1317 _cor_peek_message(nb_cmsglocked
, CMSGQUEUE_PONG
, cm
, cmtimeout
, len
);
1318 if (likely(nbstate
== NEIGHBOR_STATE_ACTIVE
)) {
1319 _cor_peek_message(nb_cmsglocked
, CMSGQUEUE_ACK
, cm
, cmtimeout
,
1321 _cor_peek_message(nb_cmsglocked
, CMSGQUEUE_ACK_CONN
, cm
,
1324 if (!for_timeout
|| atomic_read(
1325 &(nb_cmsglocked
->cmsg_delay_conndata
)) == 0) {
1326 _cor_peek_message(nb_cmsglocked
,
1327 CMSGQUEUE_CONNDATA_LOWLAT
,
1328 cm
, cmtimeout
, len
);
1329 _cor_peek_message(nb_cmsglocked
,
1330 CMSGQUEUE_CONNDATA_HIGHLAT
,
1331 cm
, cmtimeout
, len
);
1333 _cor_peek_message(nb_cmsglocked
, CMSGQUEUE_OTHER
, cm
, cmtimeout
,
1338 static unsigned long cor_get_cmsg_timer_timeout(
1339 struct cor_neighbor
*nb_cmsglocked
, int nbstate
)
1341 unsigned long pingtimeout
= cor_get_next_ping_time(nb_cmsglocked
);
1343 struct cor_control_msg_out
*cm
= 0;
1344 unsigned long cmtimeout
;
1347 cor_peek_message(nb_cmsglocked
, nbstate
, &cm
, &cmtimeout
, &len
, 1);
1350 unsigned long jiffies_tmp
= jiffies
;
1352 if (time_before(cmtimeout
, jiffies_tmp
))
1354 if (time_before(cmtimeout
, pingtimeout
))
1361 static void _cor_dequeue_messages(struct cor_neighbor
*nb_cmsglocked
,
1362 int nbstate
, __u32 targetmss
, __u32
*length
,
1363 struct list_head
*cmsgs
)
1366 __u32 spaceleft
= targetmss
- *length
;
1367 struct cor_control_msg_out
*cm
= 0;
1368 unsigned long cmtimeout
;
1371 cor_peek_message(nb_cmsglocked
, nbstate
, &cm
, &cmtimeout
, &len
,
1374 if (unlikely(cm
== 0))
1379 if (cm
->length
> spaceleft
) {
1380 if (cm
->type
== MSGTYPE_CONNDATA
) {
1381 BUG_ON(*length
== 0 && spaceleft
<
1382 KP_CONN_DATA_CMDLEN
+ 1);
1384 if (spaceleft
< KP_CONN_DATA_CMDLEN
+ 1 ||
1385 *length
> (targetmss
/4)*3)
1388 BUG_ON(*length
== 0);
1393 list_del(&(cm
->lh
));
1396 if (cm
->type
== MSGTYPE_ACK_CONN
)
1397 list_del(&(cm
->msg
.ack_conn
.conn_acks
));
1398 if (unlikely(cm
->type
== MSGTYPE_PONG
)) {
1399 BUG_ON(cm
->nb
->cmsg_pongscnt
== 0);
1400 cm
->nb
->cmsg_pongscnt
--;
1403 if (unlikely(cm
->type
== MSGTYPE_RESET_CONN
)) {
1404 BUG_ON(cm
->msg
.reset_conn
.in_pending_conn_resets
== 0);
1405 rb_erase(&(cm
->msg
.reset_conn
.rbn
),
1406 &(cm
->nb
->pending_conn_resets_rb
));
1407 cm
->msg
.reset_conn
.in_pending_conn_resets
= 0;
1408 kref_put(&(cm
->ref
), cor_kreffree_bug
);
1411 BUG_ON(*length
+ cm
->length
< *length
);
1412 if (cm
->length
> targetmss
- *length
) {
1413 BUG_ON(*length
>= targetmss
);
1414 BUG_ON(cm
->type
!= MSGTYPE_CONNDATA
);
1415 *length
= targetmss
;
1417 *length
+= cm
->length
;
1420 list_add_tail(&(cm
->lh
), cmsgs
);
1424 static __u32
cor_get_total_messages_length(struct cor_neighbor
*nb
, int ping
,
1425 int initsession
, int nbstate
, int *extralength
)
1427 __u32 length
= nb
->cmsg_pongslength
;
1429 if (likely(nbstate
== NEIGHBOR_STATE_ACTIVE
)) {
1430 length
+= nb
->cmsg_otherlength
;
1432 if (unlikely(nb
->max_cmsg_delay_sent
== 0)) {
1433 length
+= KP_SET_MAX_CMSG_DELAY_CMDLEN
;
1434 *extralength
+= KP_SET_MAX_CMSG_DELAY_CMDLEN
;
1437 if (ping
== TIMETOSENDPING_FORCE
||
1438 (length
> 0 && ping
!= TIMETOSENDPING_NO
)) {
1439 length
+= KP_PING_CMDLEN
;
1440 *extralength
+= KP_PING_CMDLEN
;
1442 if (unlikely(initsession
)) {
1443 length
+= KP_INIT_SESSION_CMDLEN
;
1444 *extralength
+= KP_INIT_SESSION_CMDLEN
;
1451 static int cor_dequeue_messages(struct cor_neighbor
*nb_cmsglocked
, int ping
,
1452 int initsession
, int nbstate
, __u32 targetmss
,
1453 __u32
*length
, struct list_head
*cmsgs
)
1455 __u32 extralength
= 0;
1458 int cmsgqueue_nonpong_empty
= (
1459 list_empty(&(nb_cmsglocked
->cmsg_queue_ack
)) &&
1460 list_empty(&(nb_cmsglocked
->cmsg_queue_ackconn
)) &&
1461 list_empty(&(nb_cmsglocked
->cmsg_queue_conndata_lowlat
)) &&
1462 list_empty(&(nb_cmsglocked
->cmsg_queue_conndata_highlat
)) &&
1463 list_empty(&(nb_cmsglocked
->cmsg_queue_other
)));
1465 BUG_ON(list_empty(&(nb_cmsglocked
->cmsg_queue_pong
)) &&
1466 nb_cmsglocked
->cmsg_pongslength
!= 0);
1467 BUG_ON(!list_empty(&(nb_cmsglocked
->cmsg_queue_pong
)) &&
1468 nb_cmsglocked
->cmsg_pongslength
== 0);
1469 BUG_ON(cmsgqueue_nonpong_empty
&&
1470 nb_cmsglocked
->cmsg_otherlength
!= 0);
1471 BUG_ON(!cmsgqueue_nonpong_empty
&&
1472 nb_cmsglocked
->cmsg_otherlength
== 0);
1474 totallength
= cor_get_total_messages_length(nb_cmsglocked
, ping
,
1475 initsession
, nbstate
, &extralength
);
1477 if (totallength
== 0)
1480 if (totallength
< targetmss
&& ping
!= TIMETOSENDPING_FORCE
&&
1481 time_after(cor_get_cmsg_timer_timeout(nb_cmsglocked
,
1485 *length
= extralength
;
1487 _cor_dequeue_messages(nb_cmsglocked
, nbstate
, targetmss
, length
, cmsgs
);
1489 BUG_ON(*length
== 0);
1490 BUG_ON(*length
> targetmss
);
1495 static void cor_add_timeouted_retrans(struct cor_neighbor
*nb
)
1497 spin_lock_bh(&(nb
->retrans_lock
));
1499 while (!list_empty(&(nb
->retrans_list
))) {
1500 struct cor_control_retrans
*cr
= container_of(
1501 nb
->retrans_list
.next
,
1502 struct cor_control_retrans
, timeout_list
);
1504 BUG_ON(cr
->nb
!= nb
);
1506 if (time_after(cr
->timeout
, jiffies
)) {
1507 if (mod_timer(&(nb
->retrans_timer
), cr
->timeout
) == 0) {
1508 kref_get(&(nb
->ref
));
1513 list_del(&(cr
->timeout_list
));
1514 rb_erase(&(cr
->rbn
), &(nb
->kp_retransmits_rb
));
1516 kref_put(&(cr
->ref
), cor_kreffree_bug
); /* rb */
1518 cor_requeue_control_retrans(cr
);
1521 spin_unlock_bh(&(nb
->retrans_lock
));
1524 static void _cor_delete_all_cmsgs(struct list_head
*cmsgs
)
1526 while (!list_empty(cmsgs
)) {
1527 struct cor_control_msg_out
*cm
= container_of(cmsgs
->next
,
1528 struct cor_control_msg_out
, lh
);
1530 list_del(&(cm
->lh
));
1532 if (cm
->type
== MSGTYPE_CONNDATA
) {
1533 cor_schedule_retransmit_conn(cm
->msg
.conn_data
.cr
, 0,
1535 kfree(cm
->msg
.conn_data
.data_orig
);
1538 cor_free_control_msg(cm
);
1542 static void cor_delete_all_cmsgs(struct cor_neighbor
*nb
)
1545 struct list_head cmsgs
;
1548 INIT_LIST_HEAD(&cmsgs
);
1550 spin_lock_bh(&(nb
->cmsg_lock
));
1551 _cor_dequeue_messages(nb
, NEIGHBOR_STATE_ACTIVE
, 65536, &length
,
1553 spin_unlock_bh(&(nb
->cmsg_lock
));
1555 if (list_empty(&cmsgs
))
1558 _cor_delete_all_cmsgs(&cmsgs
);
1562 static int cor_reset_timeouted_conn_needed(struct cor_neighbor
*nb
,
1563 struct cor_conn
*src_in_l
)
1565 if (unlikely(src_in_l
->sourcetype
!= SOURCE_IN
||
1566 src_in_l
->source
.in
.nb
!= nb
||
1567 src_in_l
->isreset
!= 0))
1569 else if (likely(time_after(src_in_l
->source
.in
.jiffies_last_act
+
1570 CONN_ACTIVITY_UPDATEINTERVAL_SEC
* HZ
+
1571 CONN_INACTIVITY_TIMEOUT_SEC
* HZ
, jiffies
)))
1577 static int cor_reset_timeouted_conn(struct cor_neighbor
*nb
,
1578 struct cor_conn
*src_in
)
1582 if (src_in
->is_client
) {
1583 spin_lock_bh(&(src_in
->rcv_lock
));
1584 spin_lock_bh(&(src_in
->reversedir
->rcv_lock
));
1586 spin_lock_bh(&(src_in
->reversedir
->rcv_lock
));
1587 spin_lock_bh(&(src_in
->rcv_lock
));
1590 resetted
= cor_reset_timeouted_conn_needed(nb
, src_in
);
1591 if (unlikely(resetted
== 0))
1594 resetted
= (cor_send_reset_conn(nb
,
1595 src_in
->reversedir
->target
.out
.conn_id
, 1) == 0);
1596 if (unlikely(resetted
== 0))
1600 BUG_ON(src_in
->reversedir
->isreset
!= 0);
1601 src_in
->reversedir
->isreset
= 1;
1604 if (src_in
->is_client
) {
1605 spin_unlock_bh(&(src_in
->rcv_lock
));
1606 spin_unlock_bh(&(src_in
->reversedir
->rcv_lock
));
1608 spin_unlock_bh(&(src_in
->reversedir
->rcv_lock
));
1609 spin_unlock_bh(&(src_in
->rcv_lock
));
1613 cor_reset_conn(src_in
);
1618 static void cor_reset_timeouted_conns(struct cor_neighbor
*nb
)
1621 for (i
=0;i
<10000;i
++) {
1622 unsigned long iflags
;
1623 struct list_head
*lh
;
1624 struct cor_conn
*src_in
;
1628 spin_lock_irqsave(&(nb
->conn_list_lock
), iflags
);
1630 if (list_empty(&(nb
->rcv_conn_list
))) {
1631 spin_unlock_irqrestore(&(nb
->conn_list_lock
), iflags
);
1635 lh
= nb
->rcv_conn_list
.next
;
1637 list_add_tail(lh
, &(nb
->rcv_conn_list
));
1639 src_in
= container_of(lh
, struct cor_conn
, source
.in
.nb_list
);
1640 kref_get(&(src_in
->ref
));
1642 spin_unlock_irqrestore(&(nb
->conn_list_lock
), iflags
);
1645 spin_lock_bh(&(src_in
->rcv_lock
));
1646 BUG_ON(src_in
->sourcetype
!= SOURCE_IN
);
1647 BUG_ON(src_in
->source
.in
.nb
!= nb
);
1648 resetted
= cor_reset_timeouted_conn_needed(nb
, src_in
);
1649 spin_unlock_bh(&(src_in
->rcv_lock
));
1650 if (likely(resetted
== 0))
1653 resetted
= cor_reset_timeouted_conn(nb
, src_in
);
1656 kref_put(&(src_in
->ref
), cor_free_conn
);
1658 if (likely(resetted
== 0))
1664 * may not be called by more than one thread at the same time, because
1665 * 1) readding cor_control_msg_out may reorder them
1666 * 2) multiple pings may be sent
1668 int cor_send_messages(struct cor_neighbor
*nb
, ktime_t cmsg_send_start
,
1671 int rc
= QOS_RESUME_DONE
;
1674 __u32 targetmss
= cor_mss_cmsg(nb
);
1676 int nbstate
= cor_get_neigh_state(nb
);
1678 if (likely(nbstate
== NEIGHBOR_STATE_ACTIVE
))
1679 cor_reset_timeouted_conns(nb
);
1681 if (unlikely(nbstate
== NEIGHBOR_STATE_KILLED
)) {
1682 spin_lock_bh(&(nb
->retrans_lock
));
1683 cor_empty_retrans_queue(nb
);
1684 spin_unlock_bh(&(nb
->retrans_lock
));
1686 cor_delete_all_cmsgs(nb
);
1687 return QOS_RESUME_DONE
;
1690 ping
= cor_time_to_send_ping(nb
);
1692 spin_lock_bh(&(nb
->cmsg_lock
));
1694 if (nb
->add_retrans_needed
!= 0) {
1695 nb
->add_retrans_needed
= 0;
1696 spin_unlock_bh(&(nb
->cmsg_lock
));
1697 cor_add_timeouted_retrans(nb
);
1698 spin_lock_bh(&(nb
->cmsg_lock
));
1701 initsession
= unlikely(atomic_read(&(nb
->sessionid_snd_needed
)) != 0);
1704 struct list_head cmsgs
;
1708 INIT_LIST_HEAD(&cmsgs
);
1710 if (cor_dequeue_messages(nb
, ping
, initsession
, nbstate
,
1711 targetmss
, &length
, &cmsgs
) != 0) {
1712 cor_schedule_controlmsg_timer(nb
);
1713 spin_unlock_bh(&(nb
->cmsg_lock
));
1714 return QOS_RESUME_DONE
;
1717 nb
->kpacket_seqno
++;
1718 seqno
= nb
->kpacket_seqno
;
1720 spin_unlock_bh(&(nb
->cmsg_lock
));
1722 rc
= _cor_send_messages_send(nb
, ping
, initsession
, &cmsgs
,
1723 nbstate
, length
, seqno
, cmsg_send_start
, sent
);
1725 if (rc
!= QOS_RESUME_DONE
)
1731 spin_lock_bh(&(nb
->cmsg_lock
));
1735 static ktime_t
cor_calc_cmsg_send_start(unsigned long cmsg_timer_timeout
)
1737 ktime_t now
= ktime_get();
1738 unsigned long jiffies_tmp
= jiffies
;
1740 unsigned long jiffies_delayed
;
1741 if (unlikely(time_before(cmsg_timer_timeout
, jiffies_tmp
))) {
1742 jiffies_delayed
= 0;
1744 jiffies_delayed
= jiffies_tmp
- cmsg_timer_timeout
;
1745 if (unlikely(jiffies_delayed
> HZ
/10)) {
1746 jiffies_delayed
= HZ
/10;
1750 return ns_to_ktime(ktime_to_ns(now
) -
1751 1000LL * jiffies_to_usecs(jiffies_delayed
));
1755 void cor_controlmsg_timerfunc(struct timer_list
*cmsg_timer
)
1757 struct cor_neighbor
*nb
= container_of(cmsg_timer
,
1758 struct cor_neighbor
, cmsg_timer
);
1759 unsigned long cmsg_timer_timeout
= (unsigned long)
1760 atomic64_read(&(nb
->cmsg_timer_timeout
));
1761 ktime_t cmsg_send_start
= cor_calc_cmsg_send_start(cmsg_timer_timeout
);
1762 cor_qos_enqueue(nb
->queue
, &(nb
->rb_kp
), cmsg_send_start
,
1763 QOS_CALLER_KPACKET
);
1764 kref_put(&(nb
->ref
), cor_neighbor_free
);
1767 static int cor_cmsg_full_packet(struct cor_neighbor
*nb
, int nbstate
)
1769 __u32 extralength
= 0;
1770 int ping
= cor_time_to_send_ping(nb
);
1771 int initsession
= unlikely(atomic_read(&(nb
->sessionid_snd_needed
)) !=
1773 __u32 len
= cor_get_total_messages_length(nb
, ping
, initsession
,
1774 nbstate
, &extralength
);
1778 if (len
< cor_mss_cmsg(nb
))
1784 void cor_schedule_controlmsg_timer(struct cor_neighbor
*nb_cmsglocked
)
1786 unsigned long timeout
;
1787 int nbstate
= cor_get_neigh_state(nb_cmsglocked
);
1789 if (unlikely(nbstate
== NEIGHBOR_STATE_KILLED
))
1792 if (atomic_read(&(nb_cmsglocked
->cmsg_bulk_readds
)) != 0)
1795 if (cor_cmsg_full_packet(nb_cmsglocked
, nbstate
))
1798 if (nb_cmsglocked
->add_retrans_needed
!= 0)
1801 timeout
= cor_get_cmsg_timer_timeout(nb_cmsglocked
, nbstate
);
1805 cor_qos_enqueue(nb_cmsglocked
->queue
, &(nb_cmsglocked
->rb_kp
),
1806 ktime_get(), QOS_CALLER_KPACKET
);
1807 } else if (time_before_eq(timeout
, jiffies
)) {
1808 ktime_t cmsg_send_start
= cor_calc_cmsg_send_start(timeout
);
1809 cor_qos_enqueue(nb_cmsglocked
->queue
, &(nb_cmsglocked
->rb_kp
),
1810 cmsg_send_start
, QOS_CALLER_KPACKET
);
1812 atomic64_set(&(nb_cmsglocked
->cmsg_timer_timeout
), timeout
);
1814 if (mod_timer(&(nb_cmsglocked
->cmsg_timer
), timeout
) == 0) {
1815 kref_get(&(nb_cmsglocked
->ref
));
1820 static int cor_insert_pending_conn_resets(struct cor_control_msg_out
*ins
)
1822 struct cor_neighbor
*nb
= ins
->nb
;
1823 __u32 conn_id
= ins
->msg
.reset_conn
.conn_id
;
1825 struct rb_root
*root
;
1827 struct rb_node
*parent
= 0;
1830 BUG_ON(ins
->msg
.reset_conn
.in_pending_conn_resets
!= 0);
1832 root
= &(nb
->pending_conn_resets_rb
);
1833 p
= &(root
->rb_node
);
1836 struct cor_control_msg_out
*cm
= container_of(*p
,
1837 struct cor_control_msg_out
,
1838 msg
.reset_conn
.rbn
);
1839 __u32 cm_connid
= cm
->msg
.reset_conn
.conn_id
;
1841 BUG_ON(cm
->nb
!= ins
->nb
);
1842 BUG_ON(cm
->type
!= MSGTYPE_RESET_CONN
);
1845 if (conn_id
== cm_connid
) {
1847 } else if (conn_id
< cm_connid
) {
1849 } else if (conn_id
> cm_connid
) {
1850 p
= &(*p
)->rb_right
;
1856 kref_get(&(ins
->ref
));
1857 rb_link_node(&(ins
->msg
.reset_conn
.rbn
), parent
, p
);
1858 rb_insert_color(&(ins
->msg
.reset_conn
.rbn
), root
);
1859 ins
->msg
.reset_conn
.in_pending_conn_resets
= 1;
1864 static void cor_free_oldest_pong(struct cor_neighbor
*nb
)
1866 struct cor_control_msg_out
*cm
= container_of(nb
->cmsg_queue_pong
.next
,
1867 struct cor_control_msg_out
, lh
);
1869 BUG_ON(list_empty(&(nb
->cmsg_queue_pong
)));
1870 BUG_ON(unlikely(cm
->type
!= MSGTYPE_PONG
));
1872 list_del(&(cm
->lh
));
1873 nb
->cmsg_pongslength
-= cm
->length
;
1874 BUG_ON(nb
->cmsg_pongscnt
== 0);
1875 cm
->nb
->cmsg_pongscnt
--;
1876 cor_free_control_msg(cm
);
1879 static int _cor_enqueue_control_msg(struct cor_control_msg_out
*cm
, int src
)
1881 if (unlikely(cm
->type
== MSGTYPE_PONG
)) {
1882 BUG_ON(src
== ADDCMSG_SRC_SPLITCONNDATA
);
1884 if (cm
->nb
->cmsg_pongscnt
>= MAX_PONG_CMSGS_PER_NEIGH
) {
1885 if (src
!= ADDCMSG_SRC_NEW
) {
1886 BUG_ON(cm
->nb
->cmsg_pongscnt
== 0);
1887 cm
->nb
->cmsg_pongscnt
--;
1888 cor_free_control_msg(cm
);
1891 cor_free_oldest_pong(cm
->nb
);
1895 cm
->nb
->cmsg_pongscnt
++;
1896 cm
->nb
->cmsg_pongslength
+= cm
->length
;
1898 if (src
!= ADDCMSG_SRC_NEW
) {
1899 list_add(&(cm
->lh
), &(cm
->nb
->cmsg_queue_pong
));
1901 list_add_tail(&(cm
->lh
), &(cm
->nb
->cmsg_queue_pong
));
1905 } else if (unlikely(cm
->type
== MSGTYPE_RESET_CONN
)) {
1906 if (cor_insert_pending_conn_resets(cm
) != 0) {
1908 cor_free_control_msg(cm
);
1913 cm
->nb
->cmsg_otherlength
+= cm
->length
;
1914 if (src
== ADDCMSG_SRC_NEW
) {
1915 if (cm
->type
== MSGTYPE_ACK
) {
1916 list_add_tail(&(cm
->lh
), &(cm
->nb
->cmsg_queue_ack
));
1917 } else if (cm
->type
== MSGTYPE_ACK_CONN
) {
1918 list_add_tail(&(cm
->lh
), &(cm
->nb
->cmsg_queue_ackconn
));
1919 } else if (cm
->type
== MSGTYPE_CONNDATA
&&
1920 cm
->msg
.conn_data
.highlatency
!= 0) {
1921 list_add_tail(&(cm
->lh
),
1922 &(cm
->nb
->cmsg_queue_conndata_highlat
));
1923 } else if (cm
->type
== MSGTYPE_CONNDATA
) {
1924 list_add_tail(&(cm
->lh
),
1925 &(cm
->nb
->cmsg_queue_conndata_lowlat
));
1927 list_add_tail(&(cm
->lh
), &(cm
->nb
->cmsg_queue_other
));
1930 BUG_ON(src
== ADDCMSG_SRC_SPLITCONNDATA
&&
1931 cm
->type
!= MSGTYPE_CONNDATA
);
1932 BUG_ON(src
== ADDCMSG_SRC_READD
&&
1933 cm
->type
== MSGTYPE_ACK_CONN
);
1935 if (cm
->type
== MSGTYPE_ACK
) {
1936 list_add(&(cm
->lh
), &(cm
->nb
->cmsg_queue_ack
));
1937 } else if (cm
->type
== MSGTYPE_ACK_CONN
) {
1938 list_add(&(cm
->lh
), &(cm
->nb
->cmsg_queue_ackconn
));
1939 } else if (cm
->type
== MSGTYPE_CONNDATA
&&
1940 cm
->msg
.conn_data
.highlatency
!= 0) {
1942 &(cm
->nb
->cmsg_queue_conndata_highlat
));
1943 } else if (cm
->type
== MSGTYPE_CONNDATA
) {
1945 &(cm
->nb
->cmsg_queue_conndata_lowlat
));
1947 list_add(&(cm
->lh
), &(cm
->nb
->cmsg_queue_other
));
1954 static void cor_enqueue_control_msg(struct cor_control_msg_out
*cm
, int src
)
1957 BUG_ON(cm
->nb
== 0);
1959 if (src
== ADDCMSG_SRC_NEW
)
1960 cm
->time_added
= jiffies
;
1962 spin_lock_bh(&(cm
->nb
->cmsg_lock
));
1964 if (_cor_enqueue_control_msg(cm
, src
) != 0)
1967 if (src
!= ADDCMSG_SRC_READD
&& src
!= ADDCMSG_SRC_RETRANS
)
1968 cor_schedule_controlmsg_timer(cm
->nb
);
1971 spin_unlock_bh(&(cm
->nb
->cmsg_lock
));
1975 void cor_send_pong(struct cor_neighbor
*nb
, __u32 cookie
, ktime_t ping_rcvtime
)
1977 struct cor_control_msg_out
*cm
= _cor_alloc_control_msg(nb
);
1979 if (unlikely(cm
== 0))
1983 cm
->type
= MSGTYPE_PONG
;
1984 cm
->msg
.pong
.cookie
= cookie
;
1985 cm
->msg
.pong
.type
= MSGTYPE_PONG_TIMEENQUEUED
;
1986 cm
->msg
.pong
.ping_rcvtime
= ping_rcvtime
;
1987 cm
->msg
.pong
.time_enqueued
= ktime_get();
1989 cor_enqueue_control_msg(cm
, ADDCMSG_SRC_NEW
);
1992 void cor_send_ack(struct cor_neighbor
*nb
, __u64 seqno
)
1994 struct cor_control_msg_out
*cm
= cor_alloc_control_msg(nb
,
1997 if (unlikely(cm
== 0))
2001 cm
->type
= MSGTYPE_ACK
;
2002 cm
->msg
.ack
.seqno
= seqno
;
2004 cor_enqueue_control_msg(cm
, ADDCMSG_SRC_NEW
);
2007 static void cor_set_ooolen_flags(struct cor_control_msg_out
*cm
)
2009 cm
->msg
.ack_conn
.flags
= (cm
->msg
.ack_conn
.flags
&
2010 (~KP_ACK_CONN_FLAGS_OOO
));
2011 cm
->msg
.ack_conn
.flags
= (cm
->msg
.ack_conn
.flags
|
2012 cor_ooolen_to_flags(cm
->msg
.ack_conn
.length
));
2015 /* cmsg_lock must be held */
2016 static void cor_remove_pending_ackconn(struct cor_control_msg_out
*cm
)
2018 cm
->nb
->cmsg_otherlength
-= cm
->length
;
2019 list_del(&(cm
->lh
));
2021 list_del(&(cm
->msg
.ack_conn
.conn_acks
));
2022 kref_put(&(cm
->msg
.ack_conn
.src_in
->ref
), cor_free_conn
);
2023 cm
->msg
.ack_conn
.src_in
= 0;
2026 cor_free_control_msg(cm
);
2029 /* cmsg_lock must be held */
2030 static void cor_recalc_scheduled_ackconn_size(struct cor_control_msg_out
*cm
)
2032 cm
->nb
->cmsg_otherlength
-= cm
->length
;
2033 cm
->length
= 6 + cor_ack_conn_len(cm
->msg
.ack_conn
.flags
);
2034 cm
->nb
->cmsg_otherlength
+= cm
->length
;
2037 /* cmsg_lock must be held */
2038 static int _cor_try_merge_ackconn(struct cor_conn
*src_in_l
,
2039 struct cor_control_msg_out
*fromcm
,
2040 struct cor_control_msg_out
*tocm
, int from_newack
)
2042 if (cor_ooolen(fromcm
->msg
.ack_conn
.flags
) != 0 &&
2043 cor_ooolen(tocm
->msg
.ack_conn
.flags
) != 0) {
2044 __u64 tocmseqno
= tocm
->msg
.ack_conn
.seqno_ooo
;
2045 __u64 tocmlength
= tocm
->msg
.ack_conn
.length
;
2046 __u64 fromcmseqno
= fromcm
->msg
.ack_conn
.seqno_ooo
;
2047 __u64 fromcmlength
= fromcm
->msg
.ack_conn
.length
;
2049 if (cor_seqno_eq(tocmseqno
, fromcmseqno
)) {
2050 if (fromcmlength
> tocmlength
)
2051 tocm
->msg
.ack_conn
.length
= fromcmlength
;
2052 } else if (cor_seqno_after(fromcmseqno
, tocmseqno
) &&
2053 cor_seqno_before_eq(fromcmseqno
, tocmseqno
+
2055 __u64 len
= cor_seqno_clean(fromcmseqno
+ fromcmlength
-
2057 BUG_ON(len
> U32_MAX
);
2058 tocm
->msg
.ack_conn
.length
= (__u32
) len
;
2059 } else if (cor_seqno_before(fromcmseqno
, tocmseqno
) &&
2060 cor_seqno_after_eq(fromcmseqno
, tocmseqno
)) {
2061 __u64 len
= cor_seqno_clean(tocmseqno
+ tocmlength
-
2063 BUG_ON(len
> U32_MAX
);
2064 tocm
->msg
.ack_conn
.seqno_ooo
= fromcmseqno
;
2065 tocm
->msg
.ack_conn
.length
= (__u32
) len
;
2069 cor_set_ooolen_flags(tocm
);
2072 if ((fromcm
->msg
.ack_conn
.flags
&
2073 KP_ACK_CONN_FLAGS_SEQNO
) != 0) {
2074 if ((tocm
->msg
.ack_conn
.flags
& KP_ACK_CONN_FLAGS_SEQNO
) == 0)
2077 BUG_ON(cor_seqno_eq(fromcm
->msg
.ack_conn
.ack_seqno
,
2078 tocm
->msg
.ack_conn
.ack_seqno
));
2079 if (cor_seqno_after_eq(tocm
->msg
.ack_conn
.ack_seqno
,
2080 fromcm
->msg
.ack_conn
.ack_seqno
)) {
2081 BUG_ON(cor_seqno_after(fromcm
->msg
.ack_conn
.seqno
,
2082 tocm
->msg
.ack_conn
.seqno
));
2086 BUG_ON(cor_seqno_before(fromcm
->msg
.ack_conn
.seqno
,
2087 tocm
->msg
.ack_conn
.seqno
));
2090 tocm
->msg
.ack_conn
.flags
= (tocm
->msg
.ack_conn
.flags
|
2091 KP_ACK_CONN_FLAGS_SEQNO
);
2092 tocm
->msg
.ack_conn
.seqno
= fromcm
->msg
.ack_conn
.seqno
;
2093 tocm
->msg
.ack_conn
.ack_seqno
= fromcm
->msg
.ack_conn
.ack_seqno
;
2096 if ((fromcm
->msg
.ack_conn
.flags
&
2097 KP_ACK_CONN_FLAGS_WINDOW
) != 0)
2098 tocm
->msg
.ack_conn
.flags
= (tocm
->msg
.ack_conn
.flags
|
2099 KP_ACK_CONN_FLAGS_WINDOW
);
2103 if (cor_ooolen(fromcm
->msg
.ack_conn
.flags
) != 0) {
2104 tocm
->msg
.ack_conn
.seqno_ooo
= fromcm
->msg
.ack_conn
.seqno_ooo
;
2105 tocm
->msg
.ack_conn
.length
= fromcm
->msg
.ack_conn
.length
;
2106 cor_set_ooolen_flags(tocm
);
2109 if ((fromcm
->msg
.ack_conn
.flags
& KP_ACK_CONN_FLAGS_PRIORITY
) != 0) {
2110 BUG_ON((tocm
->msg
.ack_conn
.flags
&
2111 KP_ACK_CONN_FLAGS_PRIORITY
) != 0);
2112 tocm
->msg
.ack_conn
.priority_seqno
=
2113 fromcm
->msg
.ack_conn
.priority_seqno
;
2114 tocm
->msg
.ack_conn
.priority
= fromcm
->msg
.ack_conn
.priority
;
2117 cor_recalc_scheduled_ackconn_size(tocm
);
2118 if (from_newack
== 0)
2119 cor_remove_pending_ackconn(fromcm
);
2124 /* cmsg_lock must be held */
2125 static void cor_try_merge_ackconns(struct cor_conn
*src_in_l
,
2126 struct cor_control_msg_out
*cm
)
2128 struct list_head
*currlh
= cm
->msg
.ack_conn
.conn_acks
.next
;
2130 while (currlh
!= &(src_in_l
->source
.in
.acks_pending
)) {
2131 struct cor_control_msg_out
*currcm
= container_of(currlh
,
2132 struct cor_control_msg_out
,
2133 msg
.ack_conn
.conn_acks
);
2134 currlh
= currlh
->next
;
2135 cor_remove_connack_oooflag_ifold(src_in_l
, currcm
);
2136 _cor_try_merge_ackconn(src_in_l
, currcm
, cm
, 0);
2140 static void cor_merge_or_enqueue_ackconn(struct cor_conn
*src_in_l
,
2141 struct cor_control_msg_out
*cm
, int src
)
2143 struct list_head
*currlh
;
2145 BUG_ON(src_in_l
->sourcetype
!= SOURCE_IN
);
2147 spin_lock_bh(&(cm
->nb
->cmsg_lock
));
2149 currlh
= src_in_l
->source
.in
.acks_pending
.next
;
2150 while (currlh
!= &(src_in_l
->source
.in
.acks_pending
)) {
2151 struct cor_control_msg_out
*currcm
= container_of(currlh
,
2152 struct cor_control_msg_out
,
2153 msg
.ack_conn
.conn_acks
);
2155 BUG_ON(currcm
->nb
!= cm
->nb
);
2156 BUG_ON(currcm
->type
!= MSGTYPE_ACK_CONN
);
2157 BUG_ON(cm
->msg
.ack_conn
.src_in
!= src_in_l
);
2158 BUG_ON(currcm
->msg
.ack_conn
.conn_id
!=
2159 cm
->msg
.ack_conn
.conn_id
);
2161 if (_cor_try_merge_ackconn(src_in_l
, cm
, currcm
, 1) == 0) {
2162 cor_try_merge_ackconns(src_in_l
, currcm
);
2163 cor_schedule_controlmsg_timer(currcm
->nb
);
2164 spin_unlock_bh(&(currcm
->nb
->cmsg_lock
));
2167 * when calling cor_free_control_msg here conn may
2168 * already be locked and priority_send_allowed and
2169 * priority_send_allowed should not be reset
2171 cm
->msg
.ack_conn
.flags
= 0;
2172 cor_free_control_msg(cm
);
2176 currlh
= currlh
->next
;
2179 list_add_tail(&(cm
->msg
.ack_conn
.conn_acks
),
2180 &(src_in_l
->source
.in
.acks_pending
));
2182 spin_unlock_bh(&(cm
->nb
->cmsg_lock
));
2184 cor_enqueue_control_msg(cm
, src
);
2187 static int cor_try_update_ackconn_seqno(struct cor_conn
*src_in_l
)
2191 spin_lock_bh(&(src_in_l
->source
.in
.nb
->cmsg_lock
));
2193 if (list_empty(&(src_in_l
->source
.in
.acks_pending
)) == 0) {
2194 struct cor_control_msg_out
*cm
= container_of(
2195 src_in_l
->source
.in
.acks_pending
.next
,
2196 struct cor_control_msg_out
,
2197 msg
.ack_conn
.conn_acks
);
2198 BUG_ON(cm
->nb
!= src_in_l
->source
.in
.nb
);
2199 BUG_ON(cm
->type
!= MSGTYPE_ACK_CONN
);
2200 BUG_ON(cm
->msg
.ack_conn
.src_in
!= src_in_l
);
2201 BUG_ON(cm
->msg
.ack_conn
.conn_id
!=
2202 src_in_l
->reversedir
->target
.out
.conn_id
);
2204 cm
->msg
.ack_conn
.flags
= (cm
->msg
.ack_conn
.flags
|
2205 KP_ACK_CONN_FLAGS_SEQNO
|
2206 KP_ACK_CONN_FLAGS_WINDOW
);
2207 cm
->msg
.ack_conn
.seqno
= src_in_l
->source
.in
.next_seqno
;
2209 src_in_l
->source
.in
.ack_seqno
++;
2210 cm
->msg
.ack_conn
.ack_seqno
= src_in_l
->source
.in
.ack_seqno
;
2212 cor_remove_connack_oooflag_ifold(src_in_l
, cm
);
2213 cor_recalc_scheduled_ackconn_size(cm
);
2215 cor_try_merge_ackconns(src_in_l
, cm
);
2220 spin_unlock_bh(&(src_in_l
->source
.in
.nb
->cmsg_lock
));
2225 void cor_send_ack_conn_ifneeded(struct cor_conn
*src_in_l
, __u64 seqno_ooo
,
2228 struct cor_control_msg_out
*cm
;
2230 BUG_ON(src_in_l
->sourcetype
!= SOURCE_IN
);
2232 BUG_ON(ooo_length
> 0 && cor_seqno_before_eq(seqno_ooo
,
2233 src_in_l
->source
.in
.next_seqno
));
2235 cor_update_windowlimit(src_in_l
);
2237 if (ooo_length
!= 0) {
2238 cm
= cor_alloc_control_msg(src_in_l
->source
.in
.nb
,
2244 if (src_in_l
->source
.in
.inorder_ack_needed
!= 0)
2247 if (cor_seqno_clean(src_in_l
->source
.in
.window_seqnolimit
-
2248 src_in_l
->source
.in
.next_seqno
) < WINDOW_ENCODE_MIN
)
2251 if (cor_seqno_clean(src_in_l
->source
.in
.window_seqnolimit_remote
-
2252 src_in_l
->source
.in
.next_seqno
) >= WINDOW_ENCODE_MIN
&&
2253 cor_seqno_clean(src_in_l
->source
.in
.window_seqnolimit
-
2254 src_in_l
->source
.in
.next_seqno
) * 7 <
2256 src_in_l
->source
.in
.window_seqnolimit_remote
-
2257 src_in_l
->source
.in
.next_seqno
) * 8)
2261 if (cor_try_update_ackconn_seqno(src_in_l
) == 0)
2264 cm
= cor_alloc_control_msg(src_in_l
->source
.in
.nb
, ACM_PRIORITY_MED
);
2266 printk(KERN_ERR
"error allocating inorder ack");
2271 cm
->type
= MSGTYPE_ACK_CONN
;
2272 src_in_l
->source
.in
.ack_seqno
++;
2273 cm
->msg
.ack_conn
.ack_seqno
= src_in_l
->source
.in
.ack_seqno
;
2274 kref_get(&(src_in_l
->ref
));
2275 cm
->msg
.ack_conn
.src_in
= src_in_l
;
2276 cm
->msg
.ack_conn
.conn_id
= src_in_l
->reversedir
->target
.out
.conn_id
;
2277 cm
->msg
.ack_conn
.seqno
= src_in_l
->source
.in
.next_seqno
;
2278 cm
->msg
.ack_conn
.seqno_ooo
= seqno_ooo
;
2279 cm
->msg
.ack_conn
.length
= ooo_length
;
2280 cm
->msg
.ack_conn
.flags
= KP_ACK_CONN_FLAGS_SEQNO
|
2281 KP_ACK_CONN_FLAGS_WINDOW
;
2282 cor_set_ooolen_flags(cm
);
2283 cm
->length
= 6 + cor_ack_conn_len(cm
->msg
.ack_conn
.flags
);
2285 cor_merge_or_enqueue_ackconn(src_in_l
, cm
, ADDCMSG_SRC_NEW
);
2288 src_in_l
->source
.in
.inorder_ack_needed
= 0;
2289 src_in_l
->source
.in
.window_seqnolimit_remote
=
2290 src_in_l
->source
.in
.window_seqnolimit
;
2293 static int cor_try_add_priority(struct cor_conn
*trgt_out_l
, __u8 priority
)
2296 struct cor_conn
*src_in
= trgt_out_l
->reversedir
;
2298 spin_lock_bh(&(trgt_out_l
->target
.out
.nb
->cmsg_lock
));
2300 if (list_empty(&(src_in
->source
.in
.acks_pending
)) == 0) {
2301 struct cor_control_msg_out
*cm
= container_of(
2302 src_in
->source
.in
.acks_pending
.next
,
2303 struct cor_control_msg_out
,
2304 msg
.ack_conn
.conn_acks
);
2305 BUG_ON(cm
->nb
!= trgt_out_l
->target
.out
.nb
);
2306 BUG_ON(cm
->type
!= MSGTYPE_ACK_CONN
);
2307 BUG_ON(cm
->msg
.ack_conn
.src_in
!= trgt_out_l
->reversedir
);
2308 BUG_ON(cm
->msg
.ack_conn
.conn_id
!=
2309 trgt_out_l
->target
.out
.conn_id
);
2311 BUG_ON((cm
->msg
.ack_conn
.flags
& KP_ACK_CONN_FLAGS_PRIORITY
) !=
2313 cm
->msg
.ack_conn
.flags
= (cm
->msg
.ack_conn
.flags
|
2314 KP_ACK_CONN_FLAGS_PRIORITY
);
2315 cm
->msg
.ack_conn
.priority_seqno
=
2316 trgt_out_l
->target
.out
.priority_seqno
;
2317 cm
->msg
.ack_conn
.priority
= priority
;
2318 cor_recalc_scheduled_ackconn_size(cm
);
2323 spin_unlock_bh(&(trgt_out_l
->target
.out
.nb
->cmsg_lock
));
2328 void cor_send_priority(struct cor_conn
*trgt_out_ll
, int force
, __u8 priority
)
2330 struct cor_control_msg_out
*cm
;
2332 if (cor_try_add_priority(trgt_out_ll
, priority
) == 0)
2338 cm
= cor_alloc_control_msg(trgt_out_ll
->target
.out
.nb
,
2344 cm
->type
= MSGTYPE_ACK_CONN
;
2345 cm
->msg
.ack_conn
.flags
= KP_ACK_CONN_FLAGS_PRIORITY
;
2346 kref_get(&(trgt_out_ll
->reversedir
->ref
));
2347 BUG_ON(trgt_out_ll
->targettype
!= TARGET_OUT
);
2348 cm
->msg
.ack_conn
.src_in
= trgt_out_ll
->reversedir
;
2349 cm
->msg
.ack_conn
.conn_id
= trgt_out_ll
->target
.out
.conn_id
;
2350 cm
->msg
.ack_conn
.priority_seqno
=
2351 trgt_out_ll
->target
.out
.priority_seqno
;
2352 cm
->msg
.ack_conn
.priority
= priority
;
2354 cm
->length
= 6 + cor_ack_conn_len(cm
->msg
.ack_conn
.flags
);
2355 cor_merge_or_enqueue_ackconn(trgt_out_ll
->reversedir
, cm
,
2359 trgt_out_ll
->target
.out
.priority_last
= priority
;
2360 trgt_out_ll
->target
.out
.priority_seqno
++;
2361 trgt_out_ll
->target
.out
.priority_send_allowed
= 0;
2364 void cor_free_ack_conns(struct cor_conn
*src_in_lx
)
2367 spin_lock_bh(&(src_in_lx
->source
.in
.nb
->cmsg_lock
));
2368 while (list_empty(&(src_in_lx
->source
.in
.acks_pending
)) == 0) {
2369 struct list_head
*currlh
=
2370 src_in_lx
->source
.in
.acks_pending
.next
;
2371 struct cor_control_msg_out
*currcm
= container_of(currlh
,
2372 struct cor_control_msg_out
,
2373 msg
.ack_conn
.conn_acks
);
2375 cor_remove_pending_ackconn(currcm
);
2379 cor_schedule_controlmsg_timer(src_in_lx
->source
.in
.nb
);
2380 spin_unlock_bh(&(src_in_lx
->source
.in
.nb
->cmsg_lock
));
2383 void cor_send_connect_success(struct cor_control_msg_out
*cm
, __u32 conn_id
,
2384 struct cor_conn
*src_in
)
2386 cm
->type
= MSGTYPE_CONNECT_SUCCESS
;
2387 cm
->msg
.connect_success
.conn_id
= conn_id
;
2388 kref_get(&(src_in
->ref
));
2389 cm
->msg
.connect_success
.src_in
= src_in
;
2391 cor_enqueue_control_msg(cm
, ADDCMSG_SRC_NEW
);
2394 void cor_send_connect_nb(struct cor_control_msg_out
*cm
, __u32 conn_id
,
2395 __u64 seqno1
, __u64 seqno2
, struct cor_conn
*src_in_ll
)
2397 cm
->type
= MSGTYPE_CONNECT
;
2398 cm
->msg
.connect
.conn_id
= conn_id
;
2399 cm
->msg
.connect
.seqno1
= seqno1
;
2400 cm
->msg
.connect
.seqno2
= seqno2
;
2401 kref_get(&(src_in_ll
->ref
));
2402 BUG_ON(src_in_ll
->sourcetype
!= SOURCE_IN
);
2403 cm
->msg
.connect
.src_in
= src_in_ll
;
2405 cor_enqueue_control_msg(cm
, ADDCMSG_SRC_NEW
);
2408 void cor_send_conndata(struct cor_control_msg_out
*cm
, __u32 conn_id
,
2409 __u64 seqno
, char *data_orig
, char *data
, __u32 datalen
,
2410 __u8 snd_delayed_lowbuf
, __u8 flush
, __u8 highlatency
,
2411 struct cor_conn_retrans
*cr
)
2413 cm
->type
= MSGTYPE_CONNDATA
;
2414 cm
->msg
.conn_data
.conn_id
= conn_id
;
2415 cm
->msg
.conn_data
.seqno
= seqno
;
2416 cm
->msg
.conn_data
.data_orig
= data_orig
;
2417 cm
->msg
.conn_data
.data
= data
;
2418 cm
->msg
.conn_data
.datalen
= datalen
;
2419 cm
->msg
.conn_data
.snd_delayed_lowbuf
= snd_delayed_lowbuf
;
2420 cm
->msg
.conn_data
.flush
= flush
;
2421 cm
->msg
.conn_data
.highlatency
= highlatency
;
2422 cm
->msg
.conn_data
.cr
= cr
;
2423 cm
->length
= KP_CONN_DATA_CMDLEN
+ datalen
;
2424 cor_enqueue_control_msg(cm
, ADDCMSG_SRC_NEW
);
2427 int cor_send_reset_conn(struct cor_neighbor
*nb
, __u32 conn_id
, int lowprio
)
2429 struct cor_control_msg_out
*cm
;
2431 if (unlikely(cor_get_neigh_state(nb
) == NEIGHBOR_STATE_KILLED
))
2434 cm
= cor_alloc_control_msg(nb
, lowprio
?
2435 ACM_PRIORITY_LOW
: ACM_PRIORITY_MED
);
2437 if (unlikely(cm
== 0))
2440 cm
->type
= MSGTYPE_RESET_CONN
;
2441 cm
->msg
.reset_conn
.conn_id
= conn_id
;
2444 cor_enqueue_control_msg(cm
, ADDCMSG_SRC_NEW
);
2449 int __init
cor_kgen_init(void)
2451 cor_controlmsg_slab
= kmem_cache_create("cor_controlmsg",
2452 sizeof(struct cor_control_msg_out
), 8, 0, 0);
2453 if (unlikely(cor_controlmsg_slab
== 0))
2456 cor_controlretrans_slab
= kmem_cache_create("cor_controlretransmsg",
2457 sizeof(struct cor_control_retrans
), 8, 0, 0);
2458 if (unlikely(cor_controlretrans_slab
== 0))
2464 void __exit
cor_kgen_exit2(void)
2466 kmem_cache_destroy(cor_controlretrans_slab
);
2467 cor_controlretrans_slab
= 0;
2469 kmem_cache_destroy(cor_controlmsg_slab
);
2470 cor_controlmsg_slab
= 0;
2473 MODULE_LICENSE("GPL");