remove nbstalled_list
[cor.git] / net / cor / conn_trgt_out.c
blobcb81e22a3f85372d58ad20532327cd51a222d8de
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/gfp.h>
22 #include <linux/jiffies.h>
23 #include <linux/slab.h>
25 #include "cor.h"
27 static struct kmem_cache *cor_connretrans_slab;
30 void cor_free_connretrans(struct kref *ref)
32 struct cor_conn_retrans *cr = container_of(ref, struct cor_conn_retrans,
33 ref);
34 struct cor_conn *cn = cr->trgt_out_o;
36 BUG_ON(cr->state != CONN_RETRANS_ACKED);
38 kmem_cache_free(cor_connretrans_slab, cr);
39 cor_conn_kref_put(cn, "conn_retrans");
42 static struct cor_conn_retrans *cor_peek_next_conn_retrans(
43 struct cor_neighbor *nb_retransconnlocked)
45 struct cor_conn_retrans *cr1 = 0;
46 struct cor_conn_retrans *cr2 = 0;
48 if (list_empty(&(nb_retransconnlocked->retrans_conn_lowlatency_list)) == 0)
49 cr1 = container_of(
50 nb_retransconnlocked->retrans_conn_lowlatency_list.next,
51 struct cor_conn_retrans, timeout_list);
53 if (list_empty(&(nb_retransconnlocked->retrans_conn_highlatency_list)) == 0)
54 cr2 = container_of(
55 nb_retransconnlocked->retrans_conn_highlatency_list.next,
56 struct cor_conn_retrans, timeout_list);
58 if (cr1 == 0)
59 return cr2;
60 if (cr2 == 0)
61 return cr1;
63 if (time_before_eq(cr1->timeout, jiffies))
64 return cr1;
66 if (time_before_eq(cr1->timeout, cr2->timeout))
67 return cr1;
69 return cr2;
72 void cor_reschedule_conn_retrans_timer(
73 struct cor_neighbor *nb_retransconnlocked)
75 struct cor_conn_retrans *cr =
76 cor_peek_next_conn_retrans(nb_retransconnlocked);
78 if (cr == 0)
79 return;
81 if (time_before_eq(cr->timeout, jiffies)) {
82 cor_qos_enqueue(nb_retransconnlocked->queue,
83 &(nb_retransconnlocked->rb_cr), 0,
84 ns_to_ktime(0), QOS_CALLER_CONN_RETRANS, 0);
85 } else {
86 if (mod_timer(&(nb_retransconnlocked->retrans_conn_timer),
87 cr->timeout) == 0) {
88 kref_get(&(nb_retransconnlocked->ref));
93 /**
94 * caller must also call kref_get/put, see cor_reschedule_conn_retrans_timer
96 static void cor_cancel_conn_retrans(struct cor_neighbor *nb_retransconnlocked,
97 struct cor_conn *trgt_out_lx, struct cor_conn_retrans *cr,
98 __u64 *bytes_acked)
100 if (unlikely(cr->state == CONN_RETRANS_ACKED))
101 return;
103 if (cr->state == CONN_RETRANS_SCHEDULED) {
104 list_del(&(cr->timeout_list));
105 kref_put(&(cr->ref), cor_kreffree_bug);
106 } else if (cr->state == CONN_RETRANS_LOWWINDOW) {
107 BUG_ON(trgt_out_lx->target.out.retrans_lowwindow == 0);
108 if (likely(trgt_out_lx->target.out.retrans_lowwindow != 65535))
109 trgt_out_lx->target.out.retrans_lowwindow--;
112 if (cr->state != CONN_RETRANS_INITIAL)
113 *bytes_acked += cr->length;
115 list_del(&(cr->conn_list));
116 cr->state = CONN_RETRANS_ACKED;
118 kref_put(&(cr->ref), cor_free_connretrans); /* conn_list */
122 * nb->retrans_conn_lock must be held when calling this
123 * (see cor_schedule_retransmit_conn())
125 static void cor_cancel_acked_conn_retrans(struct cor_conn *trgt_out_l,
126 __u64 *bytes_acked)
128 __u64 seqno_acked = trgt_out_l->target.out.seqno_acked;
130 while (list_empty(&(trgt_out_l->target.out.retrans_list)) == 0) {
131 struct cor_conn_retrans *cr = container_of(
132 trgt_out_l->target.out.retrans_list.next,
133 struct cor_conn_retrans, conn_list);
135 if (cor_seqno_after(cr->seqno + cr->length, seqno_acked)) {
136 if (cor_seqno_before(cr->seqno, seqno_acked)) {
137 *bytes_acked += cor_seqno_clean(seqno_acked -
138 cr->seqno);
139 cr->length -= cor_seqno_clean(seqno_acked -
140 cr->seqno);
141 cr->seqno = seqno_acked;
143 break;
146 cor_cancel_conn_retrans(trgt_out_l->target.out.nb, trgt_out_l,
147 cr, bytes_acked);
150 cor_reschedule_conn_retrans_timer(trgt_out_l->target.out.nb);
153 void cor_cancel_all_conn_retrans(struct cor_conn *trgt_out_lx)
155 struct cor_neighbor *nb = trgt_out_lx->target.out.nb;
156 __u64 bytes_acked = 0;
158 spin_lock_bh(&(nb->retrans_conn_lock));
160 while (list_empty(&(trgt_out_lx->target.out.retrans_list)) == 0) {
161 struct cor_conn_retrans *cr = container_of(
162 trgt_out_lx->target.out.retrans_list.next,
163 struct cor_conn_retrans, conn_list);
164 BUG_ON(cr->trgt_out_o != trgt_out_lx);
166 cor_cancel_conn_retrans(nb, trgt_out_lx, cr, &bytes_acked);
169 cor_reschedule_conn_retrans_timer(nb);
171 spin_unlock_bh(&(nb->retrans_conn_lock));
173 if (bytes_acked > 0)
174 cor_nbcongwin_data_acked(nb, bytes_acked);
177 static void _cor_cancel_all_conn_retrans_nb(struct cor_neighbor *nb,
178 struct list_head *retrans_list, __u64 *bytes_acked)
180 while (1) {
181 struct cor_conn_retrans *cr;
183 spin_lock_bh(&(nb->retrans_conn_lock));
185 if (list_empty(retrans_list)) {
186 spin_unlock_bh(&(nb->retrans_conn_lock));
187 break;
190 cr = container_of(retrans_list->next, struct cor_conn_retrans,
191 timeout_list);
193 kref_get(&(cr->ref));
195 spin_unlock_bh(&(nb->retrans_conn_lock));
198 spin_lock_bh(&(cr->trgt_out_o->rcv_lock));
199 spin_lock_bh(&(nb->retrans_conn_lock));
201 if (likely(cr == container_of(retrans_list->next,
202 struct cor_conn_retrans, timeout_list)))
203 cor_cancel_conn_retrans(nb, cr->trgt_out_o, cr,
204 bytes_acked);
206 spin_unlock_bh(&(nb->retrans_conn_lock));
207 spin_unlock_bh(&(cr->trgt_out_o->rcv_lock));
209 kref_put(&(cr->ref), cor_free_connretrans);
213 static void cor_cancel_all_conn_retrans_nb(struct cor_neighbor *nb)
215 __u64 bytes_acked = 0;
217 _cor_cancel_all_conn_retrans_nb(nb,
218 &(nb->retrans_conn_lowlatency_list), &bytes_acked);
219 _cor_cancel_all_conn_retrans_nb(nb,
220 &(nb->retrans_conn_highlatency_list), &bytes_acked);
222 if (bytes_acked > 0)
223 cor_nbcongwin_data_acked(nb, bytes_acked);
226 static struct cor_conn_retrans *cor_prepare_conn_retrans(
227 struct cor_conn *trgt_out_l, __u64 seqno, __u32 len,
228 __u8 windowused, struct cor_conn_retrans *cr_splitted,
229 int retransconnlocked)
231 struct cor_neighbor *nb = trgt_out_l->target.out.nb;
233 struct cor_conn_retrans *cr = kmem_cache_alloc(cor_connretrans_slab,
234 GFP_ATOMIC);
236 if (unlikely(cr == 0))
237 return 0;
239 BUG_ON(trgt_out_l->isreset != 0);
241 memset(cr, 0, sizeof (struct cor_conn_retrans));
242 cr->trgt_out_o = trgt_out_l;
243 cor_conn_kref_get(trgt_out_l, "conn_retrans");
244 cr->seqno = seqno;
245 cr->length = len;
246 cr->windowused = windowused;
247 kref_init(&(cr->ref));
249 if (retransconnlocked == 0)
250 spin_lock_bh(&(nb->retrans_conn_lock));
252 if (cr_splitted != 0)
253 list_add(&(cr->conn_list), &(cr_splitted->conn_list));
254 else
255 list_add_tail(&(cr->conn_list),
256 &(cr->trgt_out_o->target.out.retrans_list));
257 kref_get(&(cr->ref)); /* conn_list */
259 if (retransconnlocked == 0)
260 spin_unlock_bh(&(nb->retrans_conn_lock));
262 return cr;
265 #define RC_SENDRETRANS_OK 0
266 #define RC_SENDRETRANS_OOM 1
267 #define RC_SENDRETRANS_QUEUEFULL 2
268 #define RC_SENDRETRANS_QUEUEFULLDROPPED 3
270 static int __cor_send_retrans(struct cor_neighbor *nb,
271 struct cor_conn *trgt_out_l, struct cor_conn_retrans *cr,
272 __u64 *bytes_sent)
274 __u8 flush = 0;
276 BUG_ON(cr->length == 0);
278 if (trgt_out_l->flush != 0 && cor_seqno_eq(cr->seqno + cr->length,
279 trgt_out_l->target.out.seqno_nextsend) &&
280 trgt_out_l->data_buf.read_remaining == 0)
281 flush = 1;
283 if (cor_send_conndata_as_skb(nb, cr->length)) {
284 struct sk_buff *skb;
285 char *dst;
286 int rc;
288 skb = cor_create_packet_conndata(nb, cr->length, GFP_ATOMIC,
289 trgt_out_l->target.out.conn_id, cr->seqno,
290 cr->windowused, flush);
291 if (unlikely(skb == 0))
292 return RC_SENDRETRANS_OOM;
294 dst = skb_put(skb, cr->length);
296 cor_databuf_pullold(trgt_out_l, cr->seqno, dst, cr->length);
298 rc = cor_dev_queue_xmit(skb, nb->queue,
299 QOS_CALLER_CONN_RETRANS);
300 if (rc == NET_XMIT_DROP)
301 return RC_SENDRETRANS_QUEUEFULLDROPPED;
302 cor_schedule_retransmit_conn(cr, 1, 0);
303 if (rc != NET_XMIT_SUCCESS)
304 return RC_SENDRETRANS_QUEUEFULL;
306 } else {
307 struct cor_control_msg_out *cm;
308 char *buf;
310 buf = kmalloc(cr->length, GFP_ATOMIC);
311 if (unlikely(buf == 0))
312 return RC_SENDRETRANS_OOM;
314 cm = cor_alloc_control_msg(nb, ACM_PRIORITY_LOW);
315 if (unlikely(cm == 0)) {
316 kfree(buf);
317 return RC_SENDRETRANS_OOM;
320 cor_databuf_pullold(trgt_out_l, cr->seqno, buf, cr->length);
322 cor_send_conndata(cm, trgt_out_l->target.out.conn_id,
323 cr->seqno, buf, buf, cr->length, cr->windowused,
324 flush, trgt_out_l->is_highlatency, cr);
327 *bytes_sent += cr->length;
329 return RC_SENDRETRANS_OK;
332 static int _cor_send_retrans_splitcr_ifneeded(
333 struct cor_neighbor *nb_retransconnlocked,
334 struct cor_conn *trgt_out_l, struct cor_conn_retrans *cr)
336 __u32 targetmss = cor_mss_conndata(nb_retransconnlocked,
337 trgt_out_l->is_highlatency != 0);
338 __u64 windowlimit = cor_seqno_clean(
339 trgt_out_l->target.out.seqno_windowlimit -
340 cr->seqno);
341 __u32 maxsize = targetmss;
342 if (windowlimit < maxsize)
343 maxsize = windowlimit;
345 if (unlikely(cr->length > maxsize)) {
346 struct cor_conn_retrans *cr2 = cor_prepare_conn_retrans(
347 trgt_out_l, cr->seqno + maxsize,
348 cr->length - maxsize, cr->windowused, cr, 1);
349 if (unlikely(cr2 == 0))
350 return RC_SENDRETRANS_OOM;
352 cr2->timeout = cr->timeout;
354 if (trgt_out_l->is_highlatency)
355 list_add(&(cr2->timeout_list),
356 &(nb_retransconnlocked->retrans_conn_highlatency_list));
357 else
358 list_add(&(cr2->timeout_list),
359 &(nb_retransconnlocked->retrans_conn_lowlatency_list));
360 /* cr2: kref from alloc goes to kref for timeout_list */
361 cr2->state = CONN_RETRANS_SCHEDULED;
363 cr->length = maxsize;
366 return RC_SENDRETRANS_OK;
369 static int _cor_send_retrans(struct cor_neighbor *nb,
370 struct cor_conn_retrans *cr, __u64 *bytes_sent)
373 struct cor_conn *trgt_out_o = cr->trgt_out_o;
374 int rc;
376 spin_lock_bh(&(trgt_out_o->rcv_lock));
378 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
379 BUG_ON(trgt_out_o->target.out.nb != nb);
381 spin_lock_bh(&(nb->retrans_conn_lock));
382 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
383 spin_unlock_bh(&(nb->retrans_conn_lock));
384 spin_unlock_bh(&(trgt_out_o->rcv_lock));
385 return 0;
388 BUG_ON(trgt_out_o->isreset != 0);
390 BUG_ON(cor_seqno_before(cr->seqno, trgt_out_o->target.out.seqno_acked));
392 if (cor_seqno_after_eq(cr->seqno,
393 trgt_out_o->target.out.seqno_windowlimit)) {
394 BUG_ON(cr->state != CONN_RETRANS_SENDING);
395 cr->state = CONN_RETRANS_LOWWINDOW;
396 if (likely(trgt_out_o->target.out.retrans_lowwindow != 65535))
397 trgt_out_o->target.out.retrans_lowwindow++;
399 spin_unlock_bh(&(nb->retrans_conn_lock));
400 spin_unlock_bh(&(trgt_out_o->rcv_lock));
401 return 0;
404 rc = _cor_send_retrans_splitcr_ifneeded(nb, trgt_out_o, cr);
406 spin_unlock_bh(&(nb->retrans_conn_lock));
408 cor_conn_kref_get(trgt_out_o, "stack");
410 if (rc == RC_SENDRETRANS_OK)
411 rc = __cor_send_retrans(nb, trgt_out_o, cr, bytes_sent);
413 if (rc == RC_SENDRETRANS_OOM || rc == RC_SENDRETRANS_QUEUEFULLDROPPED) {
414 spin_lock_bh(&(nb->retrans_conn_lock));
415 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
416 } else if (likely(cr->state == CONN_RETRANS_SENDING)) {
417 if (rc == RC_SENDRETRANS_OOM)
418 cr->timeout = jiffies + 1;
419 if (trgt_out_o->is_highlatency)
420 list_add(&(cr->timeout_list),
421 &(nb->retrans_conn_highlatency_list));
422 else
423 list_add(&(cr->timeout_list),
424 &(nb->retrans_conn_lowlatency_list));
425 kref_get(&(cr->ref));
426 cr->state = CONN_RETRANS_SCHEDULED;
427 } else {
428 BUG();
430 spin_unlock_bh(&(nb->retrans_conn_lock));
433 spin_unlock_bh(&(trgt_out_o->rcv_lock));
435 cor_conn_kref_put(trgt_out_o, "stack");
437 return (rc == RC_SENDRETRANS_OOM ||
438 rc == RC_SENDRETRANS_QUEUEFULL ||
439 rc == RC_SENDRETRANS_QUEUEFULLDROPPED);
442 int cor_send_retrans(struct cor_neighbor *nb, int *sent)
444 int queuefull = 0;
445 int nbstate = cor_get_neigh_state(nb);
446 __u64 bytes_sent = 0;
448 if (unlikely(nbstate == NEIGHBOR_STATE_STALLED)) {
449 return QOS_RESUME_DONE;
450 } else if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
452 * cor_cancel_all_conn_retrans_nb should not be needed, because
453 * cor_reset_all_conns calls cor_cancel_all_conn_retrans
455 cor_cancel_all_conn_retrans_nb(nb);
456 return QOS_RESUME_DONE;
459 while (1) {
460 struct cor_conn_retrans *cr = 0;
462 spin_lock_bh(&(nb->retrans_conn_lock));
464 cr = cor_peek_next_conn_retrans(nb);
465 if (cr == 0) {
466 spin_unlock_bh(&(nb->retrans_conn_lock));
467 break;
470 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
472 if (time_after(cr->timeout, jiffies)) {
473 cor_reschedule_conn_retrans_timer(nb);
474 spin_unlock_bh(&(nb->retrans_conn_lock));
475 break;
478 list_del(&(cr->timeout_list));
479 cr->state = CONN_RETRANS_SENDING;
481 spin_unlock_bh(&(nb->retrans_conn_lock));
483 queuefull = _cor_send_retrans(nb, cr, &bytes_sent);
484 kref_put(&(cr->ref), cor_free_connretrans); /* timeout_list */
485 if (queuefull) {
486 break;
487 } else {
488 *sent = 1;
492 if (bytes_sent > 0)
493 cor_nbcongwin_data_retransmitted(nb, bytes_sent);
495 return queuefull ? QOS_RESUME_CONG : QOS_RESUME_DONE;
498 void cor_retransmit_conn_timerfunc(struct timer_list *retrans_conn_timer)
500 struct cor_neighbor *nb = container_of(retrans_conn_timer,
501 struct cor_neighbor, retrans_conn_timer);
502 cor_qos_enqueue(nb->queue, &(nb->rb_cr), 0, ns_to_ktime(0),
503 QOS_CALLER_CONN_RETRANS, 0);
504 kref_put(&(nb->ref), cor_neighbor_free);
507 static void cor_conn_ack_ooo_rcvd_splitcr(struct cor_conn *trgt_out_l,
508 struct cor_conn_retrans *cr, __u64 seqno_ooo, __u32 length,
509 __u64 *bytes_acked)
511 struct cor_conn_retrans *cr2;
512 __u64 seqno_cr2start;
513 __u32 oldcrlenght = cr->length;
515 if (cr->state != CONN_RETRANS_SCHEDULED &&
516 cr->state != CONN_RETRANS_LOWWINDOW)
517 return;
519 seqno_cr2start = seqno_ooo+length;
520 cr2 = cor_prepare_conn_retrans(trgt_out_l, seqno_cr2start,
521 cor_seqno_clean(cr->seqno + cr->length -
522 seqno_cr2start),
523 cr->windowused, cr, 1);
525 if (unlikely(cr2 == 0))
526 return;
528 BUG_ON(cr2->length > cr->length);
530 cr2->timeout = cr->timeout;
531 cr2->state = cr->state;
533 if (cr->state != CONN_RETRANS_SCHEDULED) {
534 list_add(&(cr2->timeout_list), &(cr->timeout_list));
535 kref_get(&(cr2->ref));
538 BUG_ON(cor_seqno_clean(seqno_ooo - cr->seqno) > cr->length);
540 cr->length -= cor_seqno_clean(seqno_ooo - cr->seqno);
541 BUG_ON(cr->length + length + cr2->length != oldcrlenght);
542 kref_put(&(cr2->ref), cor_kreffree_bug); /* alloc */
544 *bytes_acked += length;
547 void cor_conn_ack_ooo_rcvd(struct cor_neighbor *nb, __u32 conn_id,
548 struct cor_conn *trgt_out, __u64 seqno_ooo, __u32 length,
549 __u64 *bytes_acked)
551 struct list_head *curr;
553 if (unlikely(length == 0))
554 return;
556 spin_lock_bh(&(trgt_out->rcv_lock));
558 if (unlikely(trgt_out->targettype != TARGET_OUT))
559 goto out;
560 if (unlikely(trgt_out->target.out.nb != nb))
561 goto out;
562 if (unlikely(trgt_out->target.out.conn_id != conn_id))
563 goto out;
565 kref_get(&(nb->ref));
566 spin_lock_bh(&(nb->retrans_conn_lock));
568 curr = trgt_out->target.out.retrans_list.next;
569 while (curr != &(trgt_out->target.out.retrans_list)) {
570 struct cor_conn_retrans *cr = container_of(curr,
571 struct cor_conn_retrans, conn_list);
573 int ack_covers_start = cor_seqno_after_eq(cr->seqno, seqno_ooo);
574 int ack_covers_end = cor_seqno_before_eq(cr->seqno + cr->length,
575 seqno_ooo + length);
577 curr = curr->next;
579 if (cor_seqno_before(cr->seqno + cr->length, seqno_ooo))
580 continue;
582 if (cor_seqno_after(cr->seqno, seqno_ooo + length))
583 break;
585 if (likely(ack_covers_start && ack_covers_end)) {
586 cor_cancel_conn_retrans(nb, trgt_out, cr, bytes_acked);
587 cor_reschedule_conn_retrans_timer(nb);
588 } else if (ack_covers_start) {
589 __u32 diff = seqno_ooo + length - cr->seqno -
590 cr->length;
591 BUG_ON(diff >= cr->length);
592 cr->seqno += diff;
593 cr->length -= diff;
594 *bytes_acked =+ diff;
595 } else if (ack_covers_end) {
596 __u32 diff = seqno_ooo + length - cr->seqno;
597 BUG_ON(diff >= length);
598 cr->length -= diff;
599 *bytes_acked += diff;
600 } else {
601 cor_conn_ack_ooo_rcvd_splitcr(trgt_out, cr, seqno_ooo,
602 length, bytes_acked);
603 break;
607 if (unlikely(list_empty(&(trgt_out->target.out.retrans_list)) == 0)) {
608 trgt_out->target.out.seqno_acked =
609 trgt_out->target.out.seqno_nextsend;
610 } else {
611 struct cor_conn_retrans *cr = container_of(
612 trgt_out->target.out.retrans_list.next,
613 struct cor_conn_retrans, conn_list);
614 if (cor_seqno_after(cr->seqno,
615 trgt_out->target.out.seqno_acked))
616 trgt_out->target.out.seqno_acked = cr->seqno;
619 spin_unlock_bh(&(nb->retrans_conn_lock));
620 kref_put(&(nb->ref), cor_neighbor_free);
622 out:
623 spin_unlock_bh(&(trgt_out->rcv_lock));
626 static void _cor_conn_ack_rcvd_nosendwin(struct cor_conn *trgt_out_l)
628 if (trgt_out_l->bufsize.state == BUFSIZE_INCR ||
629 trgt_out_l->bufsize.state == BUFSIZE_INCR_FAST)
630 trgt_out_l->bufsize.state = BUFSIZE_NOACTION;
632 if (trgt_out_l->bufsize.state == BUFSIZE_NOACTION)
633 trgt_out_l->bufsize.act.noact.bytesleft = max(
634 trgt_out_l->bufsize.act.noact.bytesleft,
635 (__u32) BUF_OUT_WIN_NOK_NOINCR);
637 trgt_out_l->bufsize.ignore_rcv_lowbuf = max(
638 trgt_out_l->bufsize.ignore_rcv_lowbuf,
639 (__u32) BUF_OUT_WIN_NOK_NOINCR);
643 * nb->retrans_conn_lock must be held when calling this
644 * (see cor_schedule_retransmit_conn())
646 static void cor_reschedule_lowwindow_retrans(struct cor_conn *trgt_out_l)
648 struct list_head *lh = trgt_out_l->target.out.retrans_list.next;
649 int cnt = 0;
651 while (trgt_out_l->target.out.retrans_lowwindow > 0 && cnt < 100) {
652 struct cor_conn_retrans *cr;
654 if (unlikely(lh == &(trgt_out_l->target.out.retrans_list))) {
655 BUG_ON(trgt_out_l->target.out.retrans_lowwindow !=
656 65535);
657 trgt_out_l->target.out.retrans_lowwindow = 0;
658 break;
661 cr = container_of(lh, struct cor_conn_retrans, conn_list);
663 if (cor_seqno_after_eq(cr->seqno,
664 trgt_out_l->target.out.seqno_windowlimit)) {
665 break;
668 if (cr->state == CONN_RETRANS_LOWWINDOW)
669 cor_schedule_retransmit_conn(cr, 1, 1);
671 lh = lh->next;
672 cnt++;
676 void cor_conn_ack_rcvd(struct cor_neighbor *nb, __u32 conn_id,
677 struct cor_conn *trgt_out, __u64 seqno, int setwindow,
678 __u16 window, __u8 bufsize_changerate, __u64 *bytes_acked)
680 int seqno_advanced = 0;
681 int window_enlarged = 0;
683 spin_lock_bh(&(trgt_out->rcv_lock));
685 if (unlikely(trgt_out->isreset != 0))
686 goto out;
687 if (unlikely(trgt_out->targettype != TARGET_OUT))
688 goto out;
689 if (unlikely(trgt_out->target.out.nb != nb))
690 goto out;
691 if (unlikely(cor_get_connid_reverse(trgt_out->target.out.conn_id) !=
692 conn_id))
693 goto out;
695 if (unlikely(cor_seqno_after(seqno,
696 trgt_out->target.out.seqno_nextsend) ||
697 cor_seqno_before(seqno,
698 trgt_out->target.out.seqno_acked)))
699 goto out;
701 if (setwindow) {
702 __u64 windowdec = cor_dec_window(window);
703 if (likely(cor_seqno_after(seqno,
704 trgt_out->target.out.seqno_acked)) ||
705 cor_seqno_after(seqno + windowdec,
706 trgt_out->target.out.seqno_windowlimit)) {
707 trgt_out->target.out.seqno_windowlimit = seqno +
708 windowdec;
709 window_enlarged = 1;
711 trgt_out->target.out.remote_bufsize_changerate =
712 bufsize_changerate;
715 if (cor_seqno_after(seqno, trgt_out->target.out.seqno_acked))
716 seqno_advanced = 1;
718 if (seqno_advanced == 0 && window_enlarged == 0)
719 goto out;
721 kref_get(&(nb->ref));
722 spin_lock_bh(&(nb->retrans_conn_lock));
724 if (seqno_advanced) {
725 trgt_out->target.out.seqno_acked = seqno;
726 cor_cancel_acked_conn_retrans(trgt_out, bytes_acked);
729 if (window_enlarged)
730 cor_reschedule_lowwindow_retrans(trgt_out);
732 spin_unlock_bh(&(nb->retrans_conn_lock));
733 kref_put(&(nb->ref), cor_neighbor_free);
735 if (seqno_advanced)
736 cor_databuf_ack(trgt_out, trgt_out->target.out.seqno_acked);
738 if (cor_seqno_eq(trgt_out->target.out.seqno_acked,
739 trgt_out->target.out.seqno_nextsend))
740 _cor_conn_ack_rcvd_nosendwin(trgt_out);
742 out:
743 if (seqno_advanced || window_enlarged)
744 cor_flush_buf(trgt_out);
746 spin_unlock_bh(&(trgt_out->rcv_lock));
748 cor_wake_sender(trgt_out);
751 static void cor_try_combine_conn_retrans_prev(
752 struct cor_neighbor *nb_retransconnlocked,
753 struct cor_conn *trgt_out_lx, struct cor_conn_retrans *cr)
755 struct cor_conn_retrans *cr_prev;
756 __u64 bytes_dummyacked = 0;
758 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
760 if (cr->conn_list.prev == &(trgt_out_lx->target.out.retrans_list))
761 return;
763 cr_prev = container_of(cr->conn_list.prev, struct cor_conn_retrans,
764 conn_list);
766 if (cr_prev->state != CONN_RETRANS_SCHEDULED)
767 return;
768 if (cr_prev->timeout != cr->timeout)
769 return;
770 if (!cor_seqno_eq(cr_prev->seqno + cr_prev->length, cr->seqno))
771 return;
773 cr->seqno -= cr_prev->length;
774 cr->length += cr_prev->length;
776 cor_cancel_conn_retrans(nb_retransconnlocked, trgt_out_lx, cr_prev,
777 &bytes_dummyacked);
780 static void cor_try_combine_conn_retrans_next(
781 struct cor_neighbor *nb_retranslocked,
782 struct cor_conn *trgt_out_lx, struct cor_conn_retrans *cr)
784 struct cor_conn_retrans *cr_next;
785 __u64 bytes_dummyacked = 0;
787 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
789 if (cr->conn_list.next == &(trgt_out_lx->target.out.retrans_list))
790 return;
792 cr_next = container_of(cr->conn_list.next, struct cor_conn_retrans,
793 conn_list);
795 if (cr_next->state != CONN_RETRANS_SCHEDULED)
796 return;
797 if (cr_next->timeout != cr->timeout)
798 return;
799 if (!cor_seqno_eq(cr->seqno + cr->length, cr_next->seqno))
800 return;
802 cr->length += cr_next->length;
804 cor_cancel_conn_retrans(nb_retranslocked, trgt_out_lx, cr_next,
805 &bytes_dummyacked);
808 void cor_schedule_retransmit_conn(struct cor_conn_retrans *cr, int connlocked,
809 int nbretransconn_locked)
811 struct cor_conn *trgt_out_o = cr->trgt_out_o;
812 struct cor_neighbor *nb;
813 int first;
815 if (connlocked == 0)
816 spin_lock_bh(&(trgt_out_o->rcv_lock));
818 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
819 nb = trgt_out_o->target.out.nb;
821 if (trgt_out_o->is_highlatency)
822 cr->timeout = cor_calc_timeout(atomic_read(&(nb->latency_retrans_us)),
823 atomic_read(&(nb->latency_stddev_retrans_us)),
824 atomic_read(&(nb->max_remote_ackconn_lowlat_delay_us)));
825 else
826 cr->timeout = cor_calc_timeout(atomic_read(&(nb->latency_retrans_us)),
827 atomic_read(&(nb->latency_stddev_retrans_us)),
828 atomic_read(&(nb->max_remote_ackconn_highlat_delay_us)));
830 if (nbretransconn_locked == 0)
831 spin_lock_bh(&(nb->retrans_conn_lock));
833 kref_get(&(nb->ref));
835 BUG_ON(cr->state == CONN_RETRANS_SCHEDULED);
837 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
838 goto out;
839 } else if (unlikely(cr->state == CONN_RETRANS_LOWWINDOW)) {
840 BUG_ON(trgt_out_o->target.out.retrans_lowwindow == 0);
841 if (likely(trgt_out_o->target.out.retrans_lowwindow != 65535))
842 trgt_out_o->target.out.retrans_lowwindow--;
845 if (trgt_out_o->is_highlatency) {
846 first = unlikely(list_empty(
847 &(nb->retrans_conn_highlatency_list)));
848 list_add_tail(&(cr->timeout_list),
849 &(nb->retrans_conn_highlatency_list));
850 } else {
851 first = unlikely(list_empty(
852 &(nb->retrans_conn_lowlatency_list)));
853 list_add_tail(&(cr->timeout_list),
854 &(nb->retrans_conn_lowlatency_list));
856 kref_get(&(cr->ref));
857 cr->state = CONN_RETRANS_SCHEDULED;
859 if (unlikely(first)) {
860 cor_reschedule_conn_retrans_timer(nb);
861 } else {
862 cor_try_combine_conn_retrans_prev(nb, trgt_out_o, cr);
863 cor_try_combine_conn_retrans_next(nb, trgt_out_o, cr);
866 out:
867 if (nbretransconn_locked == 0)
868 spin_unlock_bh(&(nb->retrans_conn_lock));
870 kref_put(&(nb->ref), cor_neighbor_free);
872 if (connlocked == 0)
873 spin_unlock_bh(&(trgt_out_o->rcv_lock));
876 static int _cor_flush_out_skb(struct cor_conn *trgt_out_lx, __u32 len)
878 struct cor_neighbor *nb = trgt_out_lx->target.out.nb;
880 __u64 seqno;
881 struct cor_conn_retrans *cr;
882 struct sk_buff *skb;
883 char *dst;
884 __u8 flush;
885 int rc;
887 if (trgt_out_lx->flush != 0 &&
888 trgt_out_lx->data_buf.read_remaining == len)
889 flush = 1;
891 seqno = trgt_out_lx->target.out.seqno_nextsend;
892 skb = cor_create_packet_conndata(trgt_out_lx->target.out.nb, len,
893 GFP_ATOMIC, trgt_out_lx->target.out.conn_id, seqno,
894 trgt_out_lx->target.out.lastsend_windowused, flush);
895 if (unlikely(skb == 0))
896 return RC_FLUSH_CONN_OUT_OOM;
898 cr = cor_prepare_conn_retrans(trgt_out_lx, seqno, len,
899 trgt_out_lx->target.out.lastsend_windowused, 0, 0);
900 if (unlikely(cr == 0)) {
901 kfree_skb(skb);
902 return RC_FLUSH_CONN_OUT_OOM;
905 dst = skb_put(skb, len);
907 cor_databuf_pull(trgt_out_lx, dst, len);
909 rc = cor_dev_queue_xmit(skb, nb->queue, QOS_CALLER_NEIGHBOR);
910 if (rc == NET_XMIT_DROP) {
911 cor_databuf_unpull(trgt_out_lx, len);
912 spin_lock_bh(&(nb->retrans_conn_lock));
913 cor_cancel_conn_retrans(nb, trgt_out_lx, cr, 0);
914 spin_unlock_bh(&(nb->retrans_conn_lock));
915 kref_put(&(cr->ref), cor_free_connretrans); /* alloc */
916 return RC_FLUSH_CONN_OUT_CONG;
919 trgt_out_lx->target.out.seqno_nextsend += len;
920 cor_nbcongwin_data_sent(nb, len);
921 cor_schedule_retransmit_conn(cr, 1, 0);
922 if (trgt_out_lx->sourcetype == SOURCE_SOCK)
923 cor_update_src_sock_sndspeed(trgt_out_lx, len);
925 kref_put(&(cr->ref), cor_free_connretrans); /* alloc */
927 return (rc == NET_XMIT_SUCCESS) ?
928 RC_FLUSH_CONN_OUT_OK : RC_FLUSH_CONN_OUT_SENT_CONG;
931 static int _cor_flush_out_conndata(struct cor_conn *trgt_out_lx, __u32 len)
933 __u64 seqno;
934 struct cor_control_msg_out *cm;
935 struct cor_conn_retrans *cr;
936 char *buf;
937 __u8 flush = 0;
939 if (trgt_out_lx->flush != 0 &&
940 trgt_out_lx->data_buf.read_remaining == len)
941 flush = 1;
943 buf = kmalloc(len, GFP_ATOMIC);
945 if (unlikely(buf == 0))
946 return RC_FLUSH_CONN_OUT_OOM;
948 cm = cor_alloc_control_msg(trgt_out_lx->target.out.nb, ACM_PRIORITY_LOW);
949 if (unlikely(cm == 0)) {
950 kfree(buf);
951 return RC_FLUSH_CONN_OUT_OOM;
954 seqno = trgt_out_lx->target.out.seqno_nextsend;
956 cr = cor_prepare_conn_retrans(trgt_out_lx, seqno, len,
957 trgt_out_lx->target.out.lastsend_windowused, 0, 0);
958 if (unlikely(cr == 0)) {
959 kfree(buf);
960 cor_free_control_msg(cm);
961 return RC_FLUSH_CONN_OUT_OOM;
964 cor_databuf_pull(trgt_out_lx, buf, len);
965 trgt_out_lx->target.out.seqno_nextsend += len;
966 cor_nbcongwin_data_sent(trgt_out_lx->target.out.nb, len);
967 if (trgt_out_lx->sourcetype == SOURCE_SOCK)
968 cor_update_src_sock_sndspeed(trgt_out_lx, len);
970 cor_send_conndata(cm, trgt_out_lx->target.out.conn_id, seqno, buf, buf,
971 len, trgt_out_lx->target.out.lastsend_windowused, flush,
972 trgt_out_lx->is_highlatency, cr);
974 kref_put(&(cr->ref), cor_free_connretrans); /* alloc */
976 return RC_FLUSH_CONN_OUT_OK;
979 int cor_srcin_buflimit_reached(struct cor_conn *src_in_lx)
981 __u64 window_left;
983 if (unlikely(cor_seqno_before(src_in_lx->source.in.window_seqnolimit,
984 src_in_lx->source.in.next_seqno)))
985 return 1;
987 window_left = cor_seqno_clean(src_in_lx->source.in.window_seqnolimit -
988 src_in_lx->source.in.next_seqno);
990 if (window_left < WINDOW_ENCODE_MIN)
991 return 1;
993 if (window_left/2 < src_in_lx->data_buf.read_remaining)
994 return 1;
996 return 0;
999 static __u32 cor_maxsend_left_to_len(__u32 maxsend_left)
1001 __u32 i;
1002 if (maxsend_left < 128)
1003 return maxsend_left;
1005 for (i=128;i<4096;) {
1006 if (i*2 > maxsend_left)
1007 return i;
1008 i = i*2;
1011 return maxsend_left - maxsend_left%4096;
1014 static void cor_set_last_windowused(struct cor_conn *trgt_out_lx)
1016 __u64 total_window;
1017 __u64 bytes_ackpending;
1019 BUG_ON(cor_seqno_before(trgt_out_lx->target.out.seqno_windowlimit,
1020 trgt_out_lx->target.out.seqno_acked));
1021 BUG_ON(cor_seqno_before(trgt_out_lx->target.out.seqno_nextsend,
1022 trgt_out_lx->target.out.seqno_acked));
1024 total_window = cor_seqno_clean(
1025 trgt_out_lx->target.out.seqno_windowlimit -
1026 trgt_out_lx->target.out.seqno_acked);
1027 bytes_ackpending = cor_seqno_clean(
1028 trgt_out_lx->target.out.seqno_nextsend -
1029 trgt_out_lx->target.out.seqno_acked);
1031 BUG_ON(bytes_ackpending > total_window);
1032 BUG_ON(bytes_ackpending > (U64_MAX / 64));
1034 trgt_out_lx->target.out.lastsend_windowused = div64_u64(
1035 bytes_ackpending * 31 + total_window - 1, total_window);
1038 static void _cor_flush_out_ignore_lowbuf(struct cor_conn *trgt_out_lx)
1040 trgt_out_lx->bufsize.ignore_rcv_lowbuf = max(
1041 trgt_out_lx->bufsize.ignore_rcv_lowbuf,
1042 trgt_out_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
1045 static __u64 cor_get_windowlimit(struct cor_conn *trgt_out_lx)
1047 if (unlikely(cor_seqno_before(trgt_out_lx->target.out.seqno_windowlimit,
1048 trgt_out_lx->target.out.seqno_nextsend)))
1049 return 0;
1051 return cor_seqno_clean(trgt_out_lx->target.out.seqno_windowlimit -
1052 trgt_out_lx->target.out.seqno_nextsend);
1055 int _cor_flush_out(struct cor_conn *trgt_out_lx, __u32 maxsend, __u32 *sent,
1056 int from_qos, int maxsend_forcedelay)
1058 struct cor_neighbor *nb = trgt_out_lx->target.out.nb;
1060 __u32 targetmss;
1062 int nbstate;
1064 __u32 maxsend_left = maxsend;
1066 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
1068 if (unlikely(trgt_out_lx->target.out.established == 0))
1069 return RC_FLUSH_CONN_OUT_OK;
1071 if (unlikely(trgt_out_lx->isreset != 0))
1072 return RC_FLUSH_CONN_OUT_OK;
1074 BUG_ON(trgt_out_lx->target.out.conn_id == 0);
1076 if (unlikely(trgt_out_lx->data_buf.read_remaining == 0))
1077 return RC_FLUSH_CONN_OUT_OK;
1079 if (from_qos == 0 && cor_qos_fastsend_allowed_conn(trgt_out_lx) == 0)
1080 return RC_FLUSH_CONN_OUT_CONG;
1082 cor_get_conn_idletime(trgt_out_lx);
1084 nbstate = cor_get_neigh_state(nb);
1086 if (unlikely(nbstate != NEIGHBOR_STATE_ACTIVE))
1087 return RC_FLUSH_CONN_OUT_NBNOTACTIVE;
1089 /* printk(KERN_ERR "flush %p %llu %u", trgt_out_l,
1090 cor_get_windowlimit(trgt_out_l),
1091 trgt_out_l->data_buf.read_remaining); */
1093 targetmss = cor_mss_conndata(nb, trgt_out_lx->is_highlatency != 0);
1095 while (trgt_out_lx->data_buf.read_remaining >= targetmss) {
1096 __u64 windowlimit = cor_get_windowlimit(trgt_out_lx);
1097 int rc;
1099 if (maxsend_left < targetmss)
1100 break;
1102 if (windowlimit < targetmss) {
1103 trgt_out_lx->target.out.lastsend_windowused = 31;
1104 _cor_flush_out_ignore_lowbuf(trgt_out_lx);
1105 break;
1108 if (cor_nbcongwin_send_allowed(nb) == 0)
1109 return RC_FLUSH_CONN_OUT_CONG;
1111 if (likely(cor_send_conndata_as_skb(nb, targetmss)))
1112 rc = _cor_flush_out_skb(trgt_out_lx, targetmss);
1113 else
1114 rc = _cor_flush_out_conndata(trgt_out_lx, targetmss);
1116 if (rc == RC_FLUSH_CONN_OUT_OK ||
1117 rc == RC_FLUSH_CONN_OUT_SENT_CONG) {
1118 maxsend_left -= targetmss;
1119 *sent += targetmss;
1120 cor_set_last_windowused(trgt_out_lx);
1123 if (rc == RC_FLUSH_CONN_OUT_SENT_CONG)
1124 return RC_FLUSH_CONN_OUT_CONG;
1125 if (rc != RC_FLUSH_CONN_OUT_OK)
1126 return rc;
1129 if (trgt_out_lx->data_buf.read_remaining > 0) {
1130 __u32 len = trgt_out_lx->data_buf.read_remaining;
1131 __u64 windowlimit = cor_get_windowlimit(trgt_out_lx);
1132 int rc;
1134 if (maxsend_left < len) {
1135 if (maxsend_left >= 65536 || (
1136 maxsend_left == maxsend &&
1137 maxsend_left >= 128 &&
1138 trgt_out_lx->is_highlatency == 0 &&
1139 !maxsend_forcedelay)) {
1140 len = cor_maxsend_left_to_len(maxsend_left);
1141 } else {
1142 return RC_FLUSH_CONN_OUT_MAXSENT;
1146 if (trgt_out_lx->flush == 0 &&
1147 trgt_out_lx->sourcetype == SOURCE_SOCK &&
1148 cor_sock_sndbufavailable(trgt_out_lx, 1) != 0)
1149 goto out;
1151 if (trgt_out_lx->flush == 0 &&
1152 trgt_out_lx->sourcetype == SOURCE_IN &&
1153 cor_srcin_buflimit_reached(trgt_out_lx)
1154 == 0 && (
1155 cor_seqno_eq(
1156 trgt_out_lx->target.out.seqno_nextsend,
1157 trgt_out_lx->target.out.seqno_acked) == 0 ||
1158 trgt_out_lx->is_highlatency != 0 ||
1159 LOWLATENCY_SEND_UNFLUSHED_DATA != 0))
1160 goto out;
1162 if (trgt_out_lx->flush == 0 &&
1163 trgt_out_lx->sourcetype == SOURCE_UNCONNECTED &&
1164 cor_conn_src_unconn_write_allowed(
1165 trgt_out_lx) != 0)
1166 goto out;
1168 if (windowlimit == 0 || (windowlimit < len &&
1169 cor_seqno_eq(
1170 trgt_out_lx->target.out.seqno_nextsend,
1171 trgt_out_lx->target.out.seqno_acked) == 0)) {
1172 trgt_out_lx->target.out.lastsend_windowused = 31;
1173 _cor_flush_out_ignore_lowbuf(trgt_out_lx);
1174 goto out;
1177 if (cor_nbcongwin_send_allowed(nb) == 0)
1178 return RC_FLUSH_CONN_OUT_CONG;
1180 if (len > windowlimit) {
1181 len = windowlimit;
1182 _cor_flush_out_ignore_lowbuf(trgt_out_lx);
1185 if (cor_send_conndata_as_skb(nb, len))
1186 rc = _cor_flush_out_skb(trgt_out_lx, len);
1187 else
1188 rc = _cor_flush_out_conndata(trgt_out_lx, len);
1191 if (rc == RC_FLUSH_CONN_OUT_OK ||
1192 rc == RC_FLUSH_CONN_OUT_SENT_CONG) {
1193 maxsend_left -= len;
1194 *sent += len;
1195 cor_set_last_windowused(trgt_out_lx);
1198 if (rc == RC_FLUSH_CONN_OUT_SENT_CONG)
1199 return RC_FLUSH_CONN_OUT_CONG;
1200 if (rc != RC_FLUSH_CONN_OUT_OK)
1201 return rc;
1204 out:
1205 return RC_FLUSH_CONN_OUT_OK;
1208 int __init cor_snd_init(void)
1210 cor_connretrans_slab = kmem_cache_create("cor_connretrans",
1211 sizeof(struct cor_conn_retrans), 8, 0, 0);
1212 if (unlikely(cor_connretrans_slab == 0))
1213 return -ENOMEM;
1215 return 0;
1218 void __exit cor_snd_exit2(void)
1220 kmem_cache_destroy(cor_connretrans_slab);
1221 cor_connretrans_slab = 0;
1224 MODULE_LICENSE("GPL");