send windowused instead of senddelayed flag
[cor.git] / net / cor / conn_trgt_out.c
blob631a65003b53fe0de92afc1e7fbdf90b9deff002
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/gfp.h>
22 #include <linux/jiffies.h>
23 #include <linux/slab.h>
25 #include "cor.h"
27 static struct kmem_cache *cor_connretrans_slab;
30 void cor_free_connretrans(struct kref *ref)
32 struct cor_conn_retrans *cr = container_of(ref, struct cor_conn_retrans,
33 ref);
34 struct cor_conn *cn = cr->trgt_out_o;
36 BUG_ON(cr->state != CONN_RETRANS_ACKED);
38 kmem_cache_free(cor_connretrans_slab, cr);
39 cor_conn_kref_put(cn, "conn_retrans");
42 static struct cor_conn_retrans *cor_peek_next_conn_retrans(
43 struct cor_neighbor *nb_retransconnlocked)
45 struct cor_conn_retrans *cr1 = 0;
46 struct cor_conn_retrans *cr2 = 0;
48 if (list_empty(&(nb_retransconnlocked->retrans_conn_lowlatency_list)) == 0)
49 cr1 = container_of(
50 nb_retransconnlocked->retrans_conn_lowlatency_list.next,
51 struct cor_conn_retrans, timeout_list);
53 if (list_empty(&(nb_retransconnlocked->retrans_conn_highlatency_list)) == 0)
54 cr2 = container_of(
55 nb_retransconnlocked->retrans_conn_highlatency_list.next,
56 struct cor_conn_retrans, timeout_list);
58 if (cr1 == 0)
59 return cr2;
60 if (cr2 == 0)
61 return cr1;
63 if (time_before_eq(cr1->timeout, jiffies))
64 return cr1;
66 if (time_before_eq(cr1->timeout, cr2->timeout))
67 return cr1;
69 return cr2;
72 void cor_reschedule_conn_retrans_timer(
73 struct cor_neighbor *nb_retransconnlocked)
75 struct cor_conn_retrans *cr =
76 cor_peek_next_conn_retrans(nb_retransconnlocked);
78 if (cr == 0)
79 return;
81 if (time_before_eq(cr->timeout, jiffies)) {
82 cor_qos_enqueue(nb_retransconnlocked->queue,
83 &(nb_retransconnlocked->rb_cr), 0,
84 ns_to_ktime(0), QOS_CALLER_CONN_RETRANS);
85 } else {
86 if (mod_timer(&(nb_retransconnlocked->retrans_conn_timer),
87 cr->timeout) == 0) {
88 kref_get(&(nb_retransconnlocked->ref));
93 /**
94 * warning:
95 * caller must also call kref_get/put, see cor_reschedule_conn_retrans_timer
97 static void cor_cancel_conn_retrans(struct cor_neighbor *nb_retransconnlocked,
98 struct cor_conn *trgt_out_lx, struct cor_conn_retrans *cr,
99 __u64 *bytes_acked)
101 if (unlikely(cr->state == CONN_RETRANS_ACKED))
102 return;
104 if (cr->state == CONN_RETRANS_SCHEDULED) {
105 list_del(&(cr->timeout_list));
106 kref_put(&(cr->ref), cor_kreffree_bug);
107 } else if (cr->state == CONN_RETRANS_LOWWINDOW) {
108 BUG_ON(trgt_out_lx->target.out.retrans_lowwindow == 0);
109 if (likely(trgt_out_lx->target.out.retrans_lowwindow != 65535))
110 trgt_out_lx->target.out.retrans_lowwindow--;
113 if (cr->state != CONN_RETRANS_INITIAL)
114 *bytes_acked += cr->length;
116 list_del(&(cr->conn_list));
117 cr->state = CONN_RETRANS_ACKED;
119 kref_put(&(cr->ref), cor_free_connretrans); /* conn_list */
123 * nb->retrans_conn_lock must be held when calling this
124 * (see cor_schedule_retransmit_conn())
126 static void cor_cancel_acked_conn_retrans(struct cor_conn *trgt_out_l,
127 __u64 *bytes_acked)
129 __u64 seqno_acked = trgt_out_l->target.out.seqno_acked;
131 while (list_empty(&(trgt_out_l->target.out.retrans_list)) == 0) {
132 struct cor_conn_retrans *cr = container_of(
133 trgt_out_l->target.out.retrans_list.next,
134 struct cor_conn_retrans, conn_list);
136 if (cor_seqno_after(cr->seqno + cr->length, seqno_acked)) {
137 if (cor_seqno_before(cr->seqno, seqno_acked)) {
138 *bytes_acked += cor_seqno_clean(seqno_acked -
139 cr->seqno);
140 cr->length -= cor_seqno_clean(seqno_acked -
141 cr->seqno);
142 cr->seqno = seqno_acked;
144 break;
147 cor_cancel_conn_retrans(trgt_out_l->target.out.nb, trgt_out_l,
148 cr, bytes_acked);
151 cor_reschedule_conn_retrans_timer(trgt_out_l->target.out.nb);
154 void cor_cancel_all_conn_retrans(struct cor_conn *trgt_out_lx)
156 struct cor_neighbor *nb = trgt_out_lx->target.out.nb;
157 __u64 bytes_acked = 0;
159 spin_lock_bh(&(nb->retrans_conn_lock));
161 while (list_empty(&(trgt_out_lx->target.out.retrans_list)) == 0) {
162 struct cor_conn_retrans *cr = container_of(
163 trgt_out_lx->target.out.retrans_list.next,
164 struct cor_conn_retrans, conn_list);
165 BUG_ON(cr->trgt_out_o != trgt_out_lx);
167 cor_cancel_conn_retrans(nb, trgt_out_lx, cr, &bytes_acked);
170 cor_reschedule_conn_retrans_timer(nb);
172 spin_unlock_bh(&(nb->retrans_conn_lock));
174 if (bytes_acked > 0)
175 cor_nbcongwin_data_acked(nb, bytes_acked);
178 static void _cor_cancel_all_conn_retrans_nb(struct cor_neighbor *nb,
179 struct list_head *retrans_list, __u64 *bytes_acked)
181 while (1) {
182 struct cor_conn_retrans *cr;
184 spin_lock_bh(&(nb->retrans_conn_lock));
186 if (list_empty(retrans_list)) {
187 spin_unlock_bh(&(nb->retrans_conn_lock));
188 break;
191 cr = container_of(retrans_list->next, struct cor_conn_retrans,
192 timeout_list);
194 kref_get(&(cr->ref));
196 spin_unlock_bh(&(nb->retrans_conn_lock));
199 spin_lock_bh(&(cr->trgt_out_o->rcv_lock));
200 spin_lock_bh(&(nb->retrans_conn_lock));
202 if (likely(cr == container_of(retrans_list->next,
203 struct cor_conn_retrans, timeout_list)))
204 cor_cancel_conn_retrans(nb, cr->trgt_out_o, cr,
205 bytes_acked);
207 spin_unlock_bh(&(nb->retrans_conn_lock));
208 spin_unlock_bh(&(cr->trgt_out_o->rcv_lock));
210 kref_put(&(cr->ref), cor_free_connretrans);
214 static void cor_cancel_all_conn_retrans_nb(struct cor_neighbor *nb)
216 __u64 bytes_acked = 0;
218 _cor_cancel_all_conn_retrans_nb(nb,
219 &(nb->retrans_conn_lowlatency_list), &bytes_acked);
220 _cor_cancel_all_conn_retrans_nb(nb,
221 &(nb->retrans_conn_highlatency_list), &bytes_acked);
223 if (bytes_acked > 0)
224 cor_nbcongwin_data_acked(nb, bytes_acked);
227 static struct cor_conn_retrans *cor_prepare_conn_retrans(
228 struct cor_conn *trgt_out_l, __u64 seqno, __u32 len,
229 __u8 windowused, struct cor_conn_retrans *cr_splitted,
230 int retransconnlocked)
232 struct cor_neighbor *nb = trgt_out_l->target.out.nb;
234 struct cor_conn_retrans *cr = kmem_cache_alloc(cor_connretrans_slab,
235 GFP_ATOMIC);
237 if (unlikely(cr == 0))
238 return 0;
240 BUG_ON(trgt_out_l->isreset != 0);
242 memset(cr, 0, sizeof (struct cor_conn_retrans));
243 cr->trgt_out_o = trgt_out_l;
244 cor_conn_kref_get(trgt_out_l, "conn_retrans");
245 cr->seqno = seqno;
246 cr->length = len;
247 cr->windowused = windowused;
248 kref_init(&(cr->ref));
250 if (retransconnlocked == 0)
251 spin_lock_bh(&(nb->retrans_conn_lock));
253 if (cr_splitted != 0)
254 list_add(&(cr->conn_list), &(cr_splitted->conn_list));
255 else
256 list_add_tail(&(cr->conn_list),
257 &(cr->trgt_out_o->target.out.retrans_list));
258 kref_get(&(cr->ref)); /* conn_list */
260 if (retransconnlocked == 0)
261 spin_unlock_bh(&(nb->retrans_conn_lock));
263 return cr;
266 #define RC_SENDRETRANS_OK 0
267 #define RC_SENDRETRANS_OOM 1
268 #define RC_SENDRETRANS_QUEUEFULL 2
269 #define RC_SENDRETRANS_QUEUEFULLDROPPED 3
271 static int __cor_send_retrans(struct cor_neighbor *nb,
272 struct cor_conn *trgt_out_l, struct cor_conn_retrans *cr,
273 __u64 *bytes_sent)
275 __u8 flush = 0;
277 BUG_ON(cr->length == 0);
279 if (trgt_out_l->flush != 0 && cor_seqno_eq(cr->seqno + cr->length,
280 trgt_out_l->target.out.seqno_nextsend) &&
281 trgt_out_l->data_buf.read_remaining == 0)
282 flush = 1;
284 if (cor_send_conndata_as_skb(nb, cr->length)) {
285 struct sk_buff *skb;
286 char *dst;
287 int rc;
289 skb = cor_create_packet_conndata(nb, cr->length, GFP_ATOMIC,
290 trgt_out_l->target.out.conn_id, cr->seqno,
291 cr->windowused, flush);
292 if (unlikely(skb == 0))
293 return RC_SENDRETRANS_OOM;
295 dst = skb_put(skb, cr->length);
297 cor_databuf_pullold(trgt_out_l, cr->seqno, dst, cr->length);
299 rc = cor_dev_queue_xmit(skb, nb->queue,
300 QOS_CALLER_CONN_RETRANS);
301 if (rc == NET_XMIT_DROP)
302 return RC_SENDRETRANS_QUEUEFULLDROPPED;
303 cor_schedule_retransmit_conn(cr, 1, 0);
304 if (rc != NET_XMIT_SUCCESS)
305 return RC_SENDRETRANS_QUEUEFULL;
307 } else {
308 struct cor_control_msg_out *cm;
309 char *buf;
311 buf = kmalloc(cr->length, GFP_ATOMIC);
312 if (unlikely(buf == 0))
313 return RC_SENDRETRANS_OOM;
315 cm = cor_alloc_control_msg(nb, ACM_PRIORITY_LOW);
316 if (unlikely(cm == 0)) {
317 kfree(buf);
318 return RC_SENDRETRANS_OOM;
321 cor_databuf_pullold(trgt_out_l, cr->seqno, buf, cr->length);
323 cor_send_conndata(cm, trgt_out_l->target.out.conn_id,
324 cr->seqno, buf, buf, cr->length, cr->windowused,
325 flush, trgt_out_l->is_highlatency, cr);
328 *bytes_sent += cr->length;
330 return RC_SENDRETRANS_OK;
333 static int _cor_send_retrans_splitcr_ifneeded(
334 struct cor_neighbor *nb_retransconnlocked,
335 struct cor_conn *trgt_out_l, struct cor_conn_retrans *cr)
337 __u32 targetmss = cor_mss_conndata(nb_retransconnlocked,
338 trgt_out_l->is_highlatency != 0);
339 __u64 windowlimit = cor_seqno_clean(
340 trgt_out_l->target.out.seqno_windowlimit -
341 cr->seqno);
342 __u32 maxsize = targetmss;
343 if (windowlimit < maxsize)
344 maxsize = windowlimit;
346 if (unlikely(cr->length > maxsize)) {
347 struct cor_conn_retrans *cr2 = cor_prepare_conn_retrans(
348 trgt_out_l, cr->seqno + maxsize,
349 cr->length - maxsize, cr->windowused, cr, 1);
350 if (unlikely(cr2 == 0))
351 return RC_SENDRETRANS_OOM;
353 cr2->timeout = cr->timeout;
355 if (trgt_out_l->is_highlatency)
356 list_add(&(cr2->timeout_list),
357 &(nb_retransconnlocked->retrans_conn_highlatency_list));
358 else
359 list_add(&(cr2->timeout_list),
360 &(nb_retransconnlocked->retrans_conn_lowlatency_list));
361 /* cr2: kref from alloc goes to kref for timeout_list */
362 cr2->state = CONN_RETRANS_SCHEDULED;
364 cr->length = maxsize;
367 return RC_SENDRETRANS_OK;
370 static int _cor_send_retrans(struct cor_neighbor *nb,
371 struct cor_conn_retrans *cr, __u64 *bytes_sent)
374 struct cor_conn *trgt_out_o = cr->trgt_out_o;
375 int rc;
377 spin_lock_bh(&(trgt_out_o->rcv_lock));
379 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
380 BUG_ON(trgt_out_o->target.out.nb != nb);
382 spin_lock_bh(&(nb->retrans_conn_lock));
383 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
384 spin_unlock_bh(&(nb->retrans_conn_lock));
385 spin_unlock_bh(&(trgt_out_o->rcv_lock));
386 return 0;
389 BUG_ON(trgt_out_o->isreset != 0);
391 BUG_ON(cor_seqno_before(cr->seqno, trgt_out_o->target.out.seqno_acked));
393 if (cor_seqno_after_eq(cr->seqno,
394 trgt_out_o->target.out.seqno_windowlimit)) {
395 BUG_ON(cr->state != CONN_RETRANS_SENDING);
396 cr->state = CONN_RETRANS_LOWWINDOW;
397 if (likely(trgt_out_o->target.out.retrans_lowwindow != 65535))
398 trgt_out_o->target.out.retrans_lowwindow++;
400 spin_unlock_bh(&(nb->retrans_conn_lock));
401 spin_unlock_bh(&(trgt_out_o->rcv_lock));
402 return 0;
405 rc = _cor_send_retrans_splitcr_ifneeded(nb, trgt_out_o, cr);
407 spin_unlock_bh(&(nb->retrans_conn_lock));
409 cor_conn_kref_get(trgt_out_o, "stack");
411 if (rc == RC_SENDRETRANS_OK)
412 rc = __cor_send_retrans(nb, trgt_out_o, cr, bytes_sent);
414 if (rc == RC_SENDRETRANS_OOM || rc == RC_SENDRETRANS_QUEUEFULLDROPPED) {
415 spin_lock_bh(&(nb->retrans_conn_lock));
416 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
417 } else if (likely(cr->state == CONN_RETRANS_SENDING)) {
418 if (rc == RC_SENDRETRANS_OOM)
419 cr->timeout = jiffies + 1;
420 if (trgt_out_o->is_highlatency)
421 list_add(&(cr->timeout_list),
422 &(nb->retrans_conn_highlatency_list));
423 else
424 list_add(&(cr->timeout_list),
425 &(nb->retrans_conn_lowlatency_list));
426 kref_get(&(cr->ref));
427 cr->state = CONN_RETRANS_SCHEDULED;
428 } else {
429 BUG();
431 spin_unlock_bh(&(nb->retrans_conn_lock));
434 spin_unlock_bh(&(trgt_out_o->rcv_lock));
436 cor_conn_kref_put(trgt_out_o, "stack");
438 return (rc == RC_SENDRETRANS_OOM ||
439 rc == RC_SENDRETRANS_QUEUEFULL ||
440 rc == RC_SENDRETRANS_QUEUEFULLDROPPED);
443 int cor_send_retrans(struct cor_neighbor *nb, int *sent)
445 int queuefull = 0;
446 int nbstate = cor_get_neigh_state(nb);
447 __u64 bytes_sent = 0;
449 if (unlikely(nbstate == NEIGHBOR_STATE_STALLED)) {
450 return QOS_RESUME_DONE;
451 } else if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
453 * cor_cancel_all_conn_retrans_nb should not be needed, because
454 * cor_reset_all_conns calls cor_cancel_all_conn_retrans
456 cor_cancel_all_conn_retrans_nb(nb);
457 return QOS_RESUME_DONE;
460 while (1) {
461 struct cor_conn_retrans *cr = 0;
463 spin_lock_bh(&(nb->retrans_conn_lock));
465 cr = cor_peek_next_conn_retrans(nb);
466 if (cr == 0) {
467 spin_unlock_bh(&(nb->retrans_conn_lock));
468 break;
471 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
473 if (time_after(cr->timeout, jiffies)) {
474 cor_reschedule_conn_retrans_timer(nb);
475 spin_unlock_bh(&(nb->retrans_conn_lock));
476 break;
479 list_del(&(cr->timeout_list));
480 cr->state = CONN_RETRANS_SENDING;
482 spin_unlock_bh(&(nb->retrans_conn_lock));
484 queuefull = _cor_send_retrans(nb, cr, &bytes_sent);
485 kref_put(&(cr->ref), cor_free_connretrans); /* timeout_list */
486 if (queuefull) {
487 break;
488 } else {
489 *sent = 1;
493 if (bytes_sent > 0)
494 cor_nbcongwin_data_retransmitted(nb, bytes_sent);
496 return queuefull ? QOS_RESUME_CONG : QOS_RESUME_DONE;
499 void cor_retransmit_conn_timerfunc(struct timer_list *retrans_conn_timer)
501 struct cor_neighbor *nb = container_of(retrans_conn_timer,
502 struct cor_neighbor, retrans_conn_timer);
503 cor_qos_enqueue(nb->queue, &(nb->rb_cr), 0, ns_to_ktime(0),
504 QOS_CALLER_CONN_RETRANS);
505 kref_put(&(nb->ref), cor_neighbor_free);
508 static void cor_conn_ack_ooo_rcvd_splitcr(struct cor_conn *trgt_out_l,
509 struct cor_conn_retrans *cr, __u64 seqno_ooo, __u32 length,
510 __u64 *bytes_acked)
512 struct cor_conn_retrans *cr2;
513 __u64 seqno_cr2start;
514 __u32 oldcrlenght = cr->length;
516 if (cr->state != CONN_RETRANS_SCHEDULED &&
517 cr->state != CONN_RETRANS_LOWWINDOW)
518 return;
520 seqno_cr2start = seqno_ooo+length;
521 cr2 = cor_prepare_conn_retrans(trgt_out_l, seqno_cr2start,
522 cor_seqno_clean(cr->seqno + cr->length -
523 seqno_cr2start),
524 cr->windowused, cr, 1);
526 if (unlikely(cr2 == 0))
527 return;
529 BUG_ON(cr2->length > cr->length);
531 cr2->timeout = cr->timeout;
532 cr2->state = cr->state;
534 if (cr->state != CONN_RETRANS_SCHEDULED) {
535 list_add(&(cr2->timeout_list), &(cr->timeout_list));
536 kref_get(&(cr2->ref));
539 BUG_ON(cor_seqno_clean(seqno_ooo - cr->seqno) > cr->length);
541 cr->length -= cor_seqno_clean(seqno_ooo - cr->seqno);
542 BUG_ON(cr->length + length + cr2->length != oldcrlenght);
543 kref_put(&(cr2->ref), cor_kreffree_bug); /* alloc */
545 *bytes_acked += length;
548 void cor_conn_ack_ooo_rcvd(struct cor_neighbor *nb, __u32 conn_id,
549 struct cor_conn *trgt_out, __u64 seqno_ooo, __u32 length,
550 __u64 *bytes_acked)
552 struct list_head *curr;
554 if (unlikely(length == 0))
555 return;
557 spin_lock_bh(&(trgt_out->rcv_lock));
559 if (unlikely(trgt_out->targettype != TARGET_OUT))
560 goto out;
561 if (unlikely(trgt_out->target.out.nb != nb))
562 goto out;
563 if (unlikely(trgt_out->target.out.conn_id != conn_id))
564 goto out;
566 kref_get(&(nb->ref));
567 spin_lock_bh(&(nb->retrans_conn_lock));
569 curr = trgt_out->target.out.retrans_list.next;
570 while (curr != &(trgt_out->target.out.retrans_list)) {
571 struct cor_conn_retrans *cr = container_of(curr,
572 struct cor_conn_retrans, conn_list);
574 int ack_covers_start = cor_seqno_after_eq(cr->seqno, seqno_ooo);
575 int ack_covers_end = cor_seqno_before_eq(cr->seqno + cr->length,
576 seqno_ooo + length);
578 curr = curr->next;
580 if (cor_seqno_before(cr->seqno + cr->length, seqno_ooo))
581 continue;
583 if (cor_seqno_after(cr->seqno, seqno_ooo + length))
584 break;
586 if (likely(ack_covers_start && ack_covers_end)) {
587 cor_cancel_conn_retrans(nb, trgt_out, cr, bytes_acked);
588 cor_reschedule_conn_retrans_timer(nb);
589 } else if (ack_covers_start) {
590 __u32 diff = seqno_ooo + length - cr->seqno -
591 cr->length;
592 BUG_ON(diff >= cr->length);
593 cr->seqno += diff;
594 cr->length -= diff;
595 *bytes_acked =+ diff;
596 } else if (ack_covers_end) {
597 __u32 diff = seqno_ooo + length - cr->seqno;
598 BUG_ON(diff >= length);
599 cr->length -= diff;
600 *bytes_acked += diff;
601 } else {
602 cor_conn_ack_ooo_rcvd_splitcr(trgt_out, cr, seqno_ooo,
603 length, bytes_acked);
604 break;
608 if (unlikely(list_empty(&(trgt_out->target.out.retrans_list)) == 0)) {
609 trgt_out->target.out.seqno_acked =
610 trgt_out->target.out.seqno_nextsend;
611 } else {
612 struct cor_conn_retrans *cr = container_of(
613 trgt_out->target.out.retrans_list.next,
614 struct cor_conn_retrans, conn_list);
615 if (cor_seqno_after(cr->seqno,
616 trgt_out->target.out.seqno_acked))
617 trgt_out->target.out.seqno_acked = cr->seqno;
620 spin_unlock_bh(&(nb->retrans_conn_lock));
621 kref_put(&(nb->ref), cor_neighbor_free);
623 out:
624 spin_unlock_bh(&(trgt_out->rcv_lock));
627 static void _cor_conn_ack_rcvd_nosendwin(struct cor_conn *trgt_out_l)
629 if (trgt_out_l->bufsize.state == BUFSIZE_INCR ||
630 trgt_out_l->bufsize.state == BUFSIZE_INCR_FAST)
631 trgt_out_l->bufsize.state = BUFSIZE_NOACTION;
633 if (trgt_out_l->bufsize.state == BUFSIZE_NOACTION)
634 trgt_out_l->bufsize.act.noact.bytesleft = max(
635 trgt_out_l->bufsize.act.noact.bytesleft,
636 (__u32) BUF_OUT_WIN_NOK_NOINCR);
638 trgt_out_l->bufsize.ignore_rcv_lowbuf = max(
639 trgt_out_l->bufsize.ignore_rcv_lowbuf,
640 (__u32) BUF_OUT_WIN_NOK_NOINCR);
644 * nb->retrans_conn_lock must be held when calling this
645 * (see cor_schedule_retransmit_conn())
647 static void cor_reschedule_lowwindow_retrans(struct cor_conn *trgt_out_l)
649 struct list_head *lh = trgt_out_l->target.out.retrans_list.next;
650 int cnt = 0;
652 while (trgt_out_l->target.out.retrans_lowwindow > 0 && cnt < 100) {
653 struct cor_conn_retrans *cr;
655 if (unlikely(lh == &(trgt_out_l->target.out.retrans_list))) {
656 BUG_ON(trgt_out_l->target.out.retrans_lowwindow !=
657 65535);
658 trgt_out_l->target.out.retrans_lowwindow = 0;
659 break;
662 cr = container_of(lh, struct cor_conn_retrans, conn_list);
664 if (cor_seqno_after_eq(cr->seqno,
665 trgt_out_l->target.out.seqno_windowlimit)) {
666 break;
669 if (cr->state == CONN_RETRANS_LOWWINDOW)
670 cor_schedule_retransmit_conn(cr, 1, 1);
672 lh = lh->next;
673 cnt++;
677 void cor_conn_ack_rcvd(struct cor_neighbor *nb, __u32 conn_id,
678 struct cor_conn *trgt_out, __u64 seqno, int setwindow,
679 __u8 window, __u8 bufsize_changerate, __u64 *bytes_acked)
681 int seqno_advanced = 0;
682 int window_enlarged = 0;
684 spin_lock_bh(&(trgt_out->rcv_lock));
686 if (unlikely(trgt_out->isreset != 0))
687 goto out;
688 if (unlikely(trgt_out->targettype != TARGET_OUT))
689 goto out;
690 if (unlikely(trgt_out->target.out.nb != nb))
691 goto out;
692 if (unlikely(cor_get_connid_reverse(trgt_out->target.out.conn_id) !=
693 conn_id))
694 goto out;
696 if (unlikely(cor_seqno_after(seqno,
697 trgt_out->target.out.seqno_nextsend) ||
698 cor_seqno_before(seqno,
699 trgt_out->target.out.seqno_acked)))
700 goto out;
702 if (setwindow) {
703 __u64 windowdec = cor_dec_log_64_7(window);
704 if (likely(cor_seqno_after(seqno,
705 trgt_out->target.out.seqno_acked)) ||
706 cor_seqno_after(seqno + windowdec,
707 trgt_out->target.out.seqno_windowlimit)) {
708 trgt_out->target.out.seqno_windowlimit = seqno +
709 windowdec;
710 window_enlarged = 1;
712 trgt_out->target.out.remote_bufsize_changerate =
713 bufsize_changerate;
716 if (cor_seqno_after(seqno, trgt_out->target.out.seqno_acked))
717 seqno_advanced = 1;
719 if (seqno_advanced == 0 && window_enlarged == 0)
720 goto out;
722 kref_get(&(nb->ref));
723 spin_lock_bh(&(nb->retrans_conn_lock));
725 if (seqno_advanced) {
726 trgt_out->target.out.seqno_acked = seqno;
727 cor_cancel_acked_conn_retrans(trgt_out, bytes_acked);
730 if (window_enlarged)
731 cor_reschedule_lowwindow_retrans(trgt_out);
733 spin_unlock_bh(&(nb->retrans_conn_lock));
734 kref_put(&(nb->ref), cor_neighbor_free);
736 if (seqno_advanced)
737 cor_databuf_ack(trgt_out, trgt_out->target.out.seqno_acked);
739 if (cor_seqno_eq(trgt_out->target.out.seqno_acked,
740 trgt_out->target.out.seqno_nextsend))
741 _cor_conn_ack_rcvd_nosendwin(trgt_out);
743 out:
744 if (seqno_advanced || window_enlarged)
745 cor_flush_buf(trgt_out);
747 spin_unlock_bh(&(trgt_out->rcv_lock));
749 cor_wake_sender(trgt_out);
752 static void cor_try_combine_conn_retrans_prev(
753 struct cor_neighbor *nb_retransconnlocked,
754 struct cor_conn *trgt_out_lx, struct cor_conn_retrans *cr)
756 struct cor_conn_retrans *cr_prev;
757 __u64 bytes_dummyacked = 0;
759 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
761 if (cr->conn_list.prev == &(trgt_out_lx->target.out.retrans_list))
762 return;
764 cr_prev = container_of(cr->conn_list.prev, struct cor_conn_retrans,
765 conn_list);
767 if (cr_prev->state != CONN_RETRANS_SCHEDULED)
768 return;
769 if (cr_prev->timeout != cr->timeout)
770 return;
771 if (!cor_seqno_eq(cr_prev->seqno + cr_prev->length, cr->seqno))
772 return;
774 cr->seqno -= cr_prev->length;
775 cr->length += cr_prev->length;
777 cor_cancel_conn_retrans(nb_retransconnlocked, trgt_out_lx, cr_prev,
778 &bytes_dummyacked);
781 static void cor_try_combine_conn_retrans_next(
782 struct cor_neighbor *nb_retranslocked,
783 struct cor_conn *trgt_out_lx, struct cor_conn_retrans *cr)
785 struct cor_conn_retrans *cr_next;
786 __u64 bytes_dummyacked = 0;
788 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
790 if (cr->conn_list.next == &(trgt_out_lx->target.out.retrans_list))
791 return;
793 cr_next = container_of(cr->conn_list.next, struct cor_conn_retrans,
794 conn_list);
796 if (cr_next->state != CONN_RETRANS_SCHEDULED)
797 return;
798 if (cr_next->timeout != cr->timeout)
799 return;
800 if (!cor_seqno_eq(cr->seqno + cr->length, cr_next->seqno))
801 return;
803 cr->length += cr_next->length;
805 cor_cancel_conn_retrans(nb_retranslocked, trgt_out_lx, cr_next,
806 &bytes_dummyacked);
809 void cor_schedule_retransmit_conn(struct cor_conn_retrans *cr, int connlocked,
810 int nbretransconn_locked)
812 struct cor_conn *trgt_out_o = cr->trgt_out_o;
813 struct cor_neighbor *nb;
814 int first;
816 if (connlocked == 0)
817 spin_lock_bh(&(trgt_out_o->rcv_lock));
819 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
820 nb = trgt_out_o->target.out.nb;
822 if (trgt_out_o->is_highlatency)
823 cr->timeout = cor_calc_timeout(atomic_read(&(nb->latency_retrans_us)),
824 atomic_read(&(nb->latency_stddev_retrans_us)),
825 atomic_read(&(nb->max_remote_ackconn_lowlat_delay_us)));
826 else
827 cr->timeout = cor_calc_timeout(atomic_read(&(nb->latency_retrans_us)),
828 atomic_read(&(nb->latency_stddev_retrans_us)),
829 atomic_read(&(nb->max_remote_ackconn_highlat_delay_us)));
831 if (nbretransconn_locked == 0)
832 spin_lock_bh(&(nb->retrans_conn_lock));
834 kref_get(&(nb->ref));
836 BUG_ON(cr->state == CONN_RETRANS_SCHEDULED);
838 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
839 goto out;
840 } else if (unlikely(cr->state == CONN_RETRANS_LOWWINDOW)) {
841 BUG_ON(trgt_out_o->target.out.retrans_lowwindow == 0);
842 if (likely(trgt_out_o->target.out.retrans_lowwindow != 65535))
843 trgt_out_o->target.out.retrans_lowwindow--;
846 if (trgt_out_o->is_highlatency) {
847 first = unlikely(list_empty(
848 &(nb->retrans_conn_highlatency_list)));
849 list_add_tail(&(cr->timeout_list),
850 &(nb->retrans_conn_highlatency_list));
851 } else {
852 first = unlikely(list_empty(
853 &(nb->retrans_conn_lowlatency_list)));
854 list_add_tail(&(cr->timeout_list),
855 &(nb->retrans_conn_lowlatency_list));
857 kref_get(&(cr->ref));
858 cr->state = CONN_RETRANS_SCHEDULED;
860 if (unlikely(first)) {
861 cor_reschedule_conn_retrans_timer(nb);
862 } else {
863 cor_try_combine_conn_retrans_prev(nb, trgt_out_o, cr);
864 cor_try_combine_conn_retrans_next(nb, trgt_out_o, cr);
867 out:
868 if (nbretransconn_locked == 0)
869 spin_unlock_bh(&(nb->retrans_conn_lock));
871 kref_put(&(nb->ref), cor_neighbor_free);
873 if (connlocked == 0)
874 spin_unlock_bh(&(trgt_out_o->rcv_lock));
877 static int _cor_flush_out_skb(struct cor_conn *trgt_out_lx, __u32 len)
879 struct cor_neighbor *nb = trgt_out_lx->target.out.nb;
881 __u64 seqno;
882 struct cor_conn_retrans *cr;
883 struct sk_buff *skb;
884 char *dst;
885 __u8 flush;
886 int rc;
888 if (trgt_out_lx->flush != 0 &&
889 trgt_out_lx->data_buf.read_remaining == len)
890 flush = 1;
892 seqno = trgt_out_lx->target.out.seqno_nextsend;
893 skb = cor_create_packet_conndata(trgt_out_lx->target.out.nb, len,
894 GFP_ATOMIC, trgt_out_lx->target.out.conn_id, seqno,
895 trgt_out_lx->target.out.lastsend_windowused, flush);
896 if (unlikely(skb == 0))
897 return RC_FLUSH_CONN_OUT_OOM;
899 cr = cor_prepare_conn_retrans(trgt_out_lx, seqno, len,
900 trgt_out_lx->target.out.lastsend_windowused, 0, 0);
901 if (unlikely(cr == 0)) {
902 kfree_skb(skb);
903 return RC_FLUSH_CONN_OUT_OOM;
906 dst = skb_put(skb, len);
908 cor_databuf_pull(trgt_out_lx, dst, len);
910 rc = cor_dev_queue_xmit(skb, nb->queue, QOS_CALLER_NEIGHBOR);
911 if (rc == NET_XMIT_DROP) {
912 cor_databuf_unpull(trgt_out_lx, len);
913 spin_lock_bh(&(nb->retrans_conn_lock));
914 cor_cancel_conn_retrans(nb, trgt_out_lx, cr, 0);
915 spin_unlock_bh(&(nb->retrans_conn_lock));
916 kref_put(&(cr->ref), cor_free_connretrans); /* alloc */
917 return RC_FLUSH_CONN_OUT_CONG;
920 trgt_out_lx->target.out.seqno_nextsend += len;
921 cor_nbcongwin_data_sent(nb, len);
922 cor_schedule_retransmit_conn(cr, 1, 0);
923 if (trgt_out_lx->sourcetype == SOURCE_SOCK)
924 cor_update_src_sock_sndspeed(trgt_out_lx, len);
926 kref_put(&(cr->ref), cor_free_connretrans); /* alloc */
928 return (rc == NET_XMIT_SUCCESS) ?
929 RC_FLUSH_CONN_OUT_OK : RC_FLUSH_CONN_OUT_SENT_CONG;
932 static int _cor_flush_out_conndata(struct cor_conn *trgt_out_lx, __u32 len)
934 __u64 seqno;
935 struct cor_control_msg_out *cm;
936 struct cor_conn_retrans *cr;
937 char *buf;
938 __u8 flush = 0;
940 if (trgt_out_lx->flush != 0 &&
941 trgt_out_lx->data_buf.read_remaining == len)
942 flush = 1;
944 buf = kmalloc(len, GFP_ATOMIC);
946 if (unlikely(buf == 0))
947 return RC_FLUSH_CONN_OUT_OOM;
949 cm = cor_alloc_control_msg(trgt_out_lx->target.out.nb, ACM_PRIORITY_LOW);
950 if (unlikely(cm == 0)) {
951 kfree(buf);
952 return RC_FLUSH_CONN_OUT_OOM;
955 seqno = trgt_out_lx->target.out.seqno_nextsend;
957 cr = cor_prepare_conn_retrans(trgt_out_lx, seqno, len,
958 trgt_out_lx->target.out.lastsend_windowused, 0, 0);
959 if (unlikely(cr == 0)) {
960 kfree(buf);
961 cor_free_control_msg(cm);
962 return RC_FLUSH_CONN_OUT_OOM;
965 cor_databuf_pull(trgt_out_lx, buf, len);
966 trgt_out_lx->target.out.seqno_nextsend += len;
967 cor_nbcongwin_data_sent(trgt_out_lx->target.out.nb, len);
968 if (trgt_out_lx->sourcetype == SOURCE_SOCK)
969 cor_update_src_sock_sndspeed(trgt_out_lx, len);
971 cor_send_conndata(cm, trgt_out_lx->target.out.conn_id, seqno, buf, buf,
972 len, trgt_out_lx->target.out.lastsend_windowused, flush,
973 trgt_out_lx->is_highlatency, cr);
975 kref_put(&(cr->ref), cor_free_connretrans); /* alloc */
977 return RC_FLUSH_CONN_OUT_OK;
980 int cor_srcin_buflimit_reached(struct cor_conn *src_in_lx)
982 __u64 window_left;
984 if (unlikely(cor_seqno_before(src_in_lx->source.in.window_seqnolimit,
985 src_in_lx->source.in.next_seqno)))
986 return 1;
988 window_left = cor_seqno_clean(src_in_lx->source.in.window_seqnolimit -
989 src_in_lx->source.in.next_seqno);
991 if (window_left < WINDOW_ENCODE_MIN)
992 return 1;
994 if (window_left/2 < src_in_lx->data_buf.read_remaining)
995 return 1;
997 return 0;
1000 static __u32 cor_maxsend_left_to_len(__u32 maxsend_left)
1002 __u32 i;
1003 if (maxsend_left < 128)
1004 return maxsend_left;
1006 for (i=128;i<4096;) {
1007 if (i*2 > maxsend_left)
1008 return i;
1009 i = i*2;
1012 return maxsend_left - maxsend_left%4096;
1015 static void cor_set_last_windowused(struct cor_conn *trgt_out_lx)
1017 __u64 total_window;
1018 __u64 bytes_ackpending;
1020 BUG_ON(cor_seqno_before(trgt_out_lx->target.out.seqno_windowlimit,
1021 trgt_out_lx->target.out.seqno_acked));
1022 BUG_ON(cor_seqno_before(trgt_out_lx->target.out.seqno_nextsend,
1023 trgt_out_lx->target.out.seqno_acked));
1025 total_window = cor_seqno_clean(
1026 trgt_out_lx->target.out.seqno_windowlimit -
1027 trgt_out_lx->target.out.seqno_acked);
1028 bytes_ackpending = cor_seqno_clean(
1029 trgt_out_lx->target.out.seqno_nextsend -
1030 trgt_out_lx->target.out.seqno_acked);
1032 BUG_ON(bytes_ackpending > total_window);
1033 BUG_ON(bytes_ackpending > (U64_MAX / 31));
1035 trgt_out_lx->target.out.lastsend_windowused = div64_u64(
1036 bytes_ackpending * 31, total_window);
1039 static void _cor_flush_out_ignore_lowbuf(struct cor_conn *trgt_out_lx)
1041 trgt_out_lx->bufsize.ignore_rcv_lowbuf = max(
1042 trgt_out_lx->bufsize.ignore_rcv_lowbuf,
1043 trgt_out_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
1046 static __u64 cor_get_windowlimit(struct cor_conn *trgt_out_lx)
1048 if (unlikely(cor_seqno_before(trgt_out_lx->target.out.seqno_windowlimit,
1049 trgt_out_lx->target.out.seqno_nextsend)))
1050 return 0;
1052 return cor_seqno_clean(trgt_out_lx->target.out.seqno_windowlimit -
1053 trgt_out_lx->target.out.seqno_nextsend);
1056 int _cor_flush_out(struct cor_conn *trgt_out_lx, __u32 maxsend, __u32 *sent,
1057 int from_qos, int maxsend_forcedelay)
1059 struct cor_neighbor *nb = trgt_out_lx->target.out.nb;
1061 __u32 targetmss;
1063 int nbstate;
1065 __u32 maxsend_left = maxsend;
1067 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
1069 if (unlikely(trgt_out_lx->target.out.established == 0))
1070 return RC_FLUSH_CONN_OUT_OK;
1072 if (unlikely(trgt_out_lx->isreset != 0))
1073 return RC_FLUSH_CONN_OUT_OK;
1075 BUG_ON(trgt_out_lx->target.out.conn_id == 0);
1077 if (unlikely(trgt_out_lx->data_buf.read_remaining == 0))
1078 return RC_FLUSH_CONN_OUT_OK;
1080 if (from_qos == 0 && cor_qos_fastsend_allowed_conn(trgt_out_lx) == 0)
1081 return RC_FLUSH_CONN_OUT_CONG;
1083 cor_get_conn_idletime(trgt_out_lx);
1085 spin_lock_bh(&(nb->stalledconn_lock));
1086 nbstate = cor_get_neigh_state(nb);
1087 if (unlikely(nbstate == NEIGHBOR_STATE_STALLED)) {
1088 BUG_ON(trgt_out_lx->target.out.nbstalled_lh.prev == 0 &&
1089 trgt_out_lx->target.out.nbstalled_lh.next != 0);
1090 BUG_ON(trgt_out_lx->target.out.nbstalled_lh.prev != 0 &&
1091 trgt_out_lx->target.out.nbstalled_lh.next == 0);
1093 if (trgt_out_lx->target.out.nbstalled_lh.prev == 0) {
1094 cor_conn_kref_get(trgt_out_lx, "stalledconn_list");
1095 list_add_tail(&(trgt_out_lx->target.out.nbstalled_lh),
1096 &(nb->stalledconn_list));
1099 spin_unlock_bh(&(nb->stalledconn_lock));
1101 if (unlikely(nbstate != NEIGHBOR_STATE_ACTIVE))
1102 return RC_FLUSH_CONN_OUT_NBNOTACTIVE;
1104 /* printk(KERN_ERR "flush %p %llu %u", trgt_out_l,
1105 cor_get_windowlimit(trgt_out_l),
1106 trgt_out_l->data_buf.read_remaining); */
1108 targetmss = cor_mss_conndata(nb, trgt_out_lx->is_highlatency != 0);
1110 while (trgt_out_lx->data_buf.read_remaining >= targetmss) {
1111 __u64 windowlimit = cor_get_windowlimit(trgt_out_lx);
1112 int rc;
1114 if (maxsend_left < targetmss)
1115 break;
1117 if (windowlimit < targetmss) {
1118 trgt_out_lx->target.out.lastsend_windowused = 31;
1119 _cor_flush_out_ignore_lowbuf(trgt_out_lx);
1120 break;
1123 if (cor_nbcongwin_send_allowed(nb) == 0)
1124 return RC_FLUSH_CONN_OUT_CONG;
1126 if (likely(cor_send_conndata_as_skb(nb, targetmss)))
1127 rc = _cor_flush_out_skb(trgt_out_lx, targetmss);
1128 else
1129 rc = _cor_flush_out_conndata(trgt_out_lx, targetmss);
1131 if (rc == RC_FLUSH_CONN_OUT_OK ||
1132 rc == RC_FLUSH_CONN_OUT_SENT_CONG) {
1133 maxsend_left -= targetmss;
1134 *sent += targetmss;
1135 cor_set_last_windowused(trgt_out_lx);
1138 if (rc == RC_FLUSH_CONN_OUT_SENT_CONG)
1139 return RC_FLUSH_CONN_OUT_CONG;
1140 if (rc != RC_FLUSH_CONN_OUT_OK)
1141 return rc;
1144 if (trgt_out_lx->data_buf.read_remaining > 0) {
1145 __u32 len = trgt_out_lx->data_buf.read_remaining;
1146 __u64 windowlimit = cor_get_windowlimit(trgt_out_lx);
1147 int rc;
1149 if (maxsend_left < len) {
1150 if (maxsend_left >= 65536 || (
1151 maxsend_left == maxsend &&
1152 maxsend_left >= 128 &&
1153 trgt_out_lx->is_highlatency == 0 &&
1154 !maxsend_forcedelay)) {
1155 len = cor_maxsend_left_to_len(maxsend_left);
1156 } else {
1157 return RC_FLUSH_CONN_OUT_MAXSENT;
1161 if (trgt_out_lx->flush == 0 &&
1162 trgt_out_lx->sourcetype == SOURCE_SOCK &&
1163 cor_sock_sndbufavailable(trgt_out_lx, 1) != 0)
1164 goto out;
1166 if (trgt_out_lx->flush == 0 &&
1167 trgt_out_lx->sourcetype == SOURCE_IN &&
1168 cor_srcin_buflimit_reached(trgt_out_lx)
1169 == 0 && (
1170 cor_seqno_eq(
1171 trgt_out_lx->target.out.seqno_nextsend,
1172 trgt_out_lx->target.out.seqno_acked) == 0 ||
1173 trgt_out_lx->is_highlatency != 0 ||
1174 LOWLATENCY_SEND_UNFLUSHED_DATA != 0))
1175 goto out;
1177 if (trgt_out_lx->flush == 0 &&
1178 trgt_out_lx->sourcetype == SOURCE_UNCONNECTED &&
1179 cor_conn_src_unconn_write_allowed(
1180 trgt_out_lx) != 0)
1181 goto out;
1183 if (windowlimit == 0 || (windowlimit < len &&
1184 cor_seqno_eq(
1185 trgt_out_lx->target.out.seqno_nextsend,
1186 trgt_out_lx->target.out.seqno_acked) == 0)) {
1187 trgt_out_lx->target.out.lastsend_windowused = 31;
1188 _cor_flush_out_ignore_lowbuf(trgt_out_lx);
1189 goto out;
1192 if (cor_nbcongwin_send_allowed(nb) == 0)
1193 return RC_FLUSH_CONN_OUT_CONG;
1195 if (len > windowlimit) {
1196 len = windowlimit;
1197 _cor_flush_out_ignore_lowbuf(trgt_out_lx);
1200 if (cor_send_conndata_as_skb(nb, len))
1201 rc = _cor_flush_out_skb(trgt_out_lx, len);
1202 else
1203 rc = _cor_flush_out_conndata(trgt_out_lx, len);
1206 if (rc == RC_FLUSH_CONN_OUT_OK ||
1207 rc == RC_FLUSH_CONN_OUT_SENT_CONG) {
1208 maxsend_left -= len;
1209 *sent += len;
1210 cor_set_last_windowused(trgt_out_lx);
1213 if (rc == RC_FLUSH_CONN_OUT_SENT_CONG)
1214 return RC_FLUSH_CONN_OUT_CONG;
1215 if (rc != RC_FLUSH_CONN_OUT_OK)
1216 return rc;
1219 out:
1220 return RC_FLUSH_CONN_OUT_OK;
1223 void cor_resume_nbstalled_conns(struct work_struct *work)
1225 struct cor_neighbor *nb = container_of(work, struct cor_neighbor,
1226 stalledconn_work);
1227 int rc = RC_FLUSH_CONN_OUT_OK;
1229 spin_lock_bh(&(nb->stalledconn_lock));
1230 nb->stalledconn_work_scheduled = 0;
1231 while (rc != RC_FLUSH_CONN_OUT_NBNOTACTIVE &&
1232 list_empty(&(nb->stalledconn_list)) == 0) {
1233 struct list_head *lh = nb->stalledconn_list.next;
1234 struct cor_conn *trgt_out = container_of(lh, struct cor_conn,
1235 target.out.nbstalled_lh);
1236 __u32 sent = 0;
1237 BUG_ON(trgt_out->targettype != TARGET_OUT);
1238 list_del(lh);
1239 lh->prev = 0;
1240 lh->next = 0;
1242 spin_unlock_bh(&(nb->stalledconn_lock));
1244 spin_lock_bh(&(trgt_out->rcv_lock));
1245 if (likely(trgt_out->targettype == TARGET_OUT))
1246 rc = cor_flush_out(trgt_out, &sent);
1247 spin_unlock_bh(&(trgt_out->rcv_lock));
1249 if (sent != 0)
1250 cor_wake_sender(trgt_out);
1252 cor_conn_kref_put(trgt_out, "stalledconn_list");
1254 spin_lock_bh(&(nb->stalledconn_lock));
1256 spin_unlock_bh(&(nb->stalledconn_lock));
1258 kref_put(&(nb->ref), cor_neighbor_free);
1261 int __init cor_snd_init(void)
1263 cor_connretrans_slab = kmem_cache_create("cor_connretrans",
1264 sizeof(struct cor_conn_retrans), 8, 0, 0);
1265 if (unlikely(cor_connretrans_slab == 0))
1266 return -ENOMEM;
1268 return 0;
1271 void __exit cor_snd_exit2(void)
1273 kmem_cache_destroy(cor_connretrans_slab);
1274 cor_connretrans_slab = 0;
1277 MODULE_LICENSE("GPL");